This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new bdef83b749 Add metrics to count joins and window functions (#13032) bdef83b749 is described below commit bdef83b749fe293954d785418232c5ee0ed22f12 Author: Gonzalo Ortiz Jaureguizar <gor...@users.noreply.github.com> AuthorDate: Wed Jun 5 11:20:42 2024 +0200 Add metrics to count joins and window functions (#13032) This PR adds 4 new metrics: * queriesWithJoins (global): How many queries with joins have been executed. For each query with at least one join, this meter is increased exactly once. * joinCount (global): How many joins have been executed. For each query with at least one join, this meter is increased as many times as joins in the query. * queriesWithWindow (global): How many queries with window functions have been executed. For each query with at least one window function, this meter is increased exactly once. * windowCount (global): How many window functions have been executed. For each query with at least one window function, this meter is increased as many times as window functions in the query. --- .../apache/pinot/common/metrics/BrokerMeter.java | 28 ++- .../apache/pinot/common/metrics/BrokerMetrics.java | 10 +- .../pinot/common/metrics/ControllerMetrics.java | 6 +- .../apache/pinot/common/metrics/MinionMetrics.java | 6 +- .../apache/pinot/common/metrics/ServerMetrics.java | 9 +- .../org/apache/pinot/query/QueryEnvironment.java | 3 +- .../planner/logical/PinotLogicalQueryPlanner.java | 33 ++- .../annotations/metrics/PinotMetricsFactory.java | 34 +++ .../spi/metrics/NoopPinotMetricsRegistry.java | 228 +++++++++++++++++++++ .../apache/pinot/spi/metrics/PinotMetricUtils.java | 13 +- 10 files changed, 346 insertions(+), 24 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java index 006ee458eb..d6b7af9840 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java @@ -121,7 +121,33 @@ public enum BrokerMeter implements AbstractMetrics.Meter { NETTY_CONNECTION_BYTES_RECEIVED("nettyConnection", true), PROACTIVE_CLUSTER_CHANGE_CHECK("proactiveClusterChangeCheck", true), - DIRECT_MEMORY_OOM("directMemoryOOMCount", true); + DIRECT_MEMORY_OOM("directMemoryOOMCount", true), + + /** + * How many queries with joins have been executed. + * <p> + * For each query with at least one join, this meter is increased exactly once. + */ + QUERIES_WITH_JOINS("queries", true), + /** + * How many joins have been executed. + * <p> + * For each query with at least one join, this meter is increased as many times as joins in the query. + */ + JOIN_COUNT("queries", true), + /** + * How many queries with window functions have been executed. + * <p> + * For each query with at least one window function, this meter is increased exactly once. + */ + QUERIES_WITH_WINDOW("queries", true), + /** + * How many window functions have been executed. + * <p> + * For each query with at least one window function, this meter is increased as many times as window functions in the + * query. + */ + WINDOW_COUNT("queries", true),; private final String _brokerMeterName; private final String _unit; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMetrics.java index cf6a310eb2..76dc8e2e6b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMetrics.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMetrics.java @@ -21,6 +21,7 @@ package org.apache.pinot.common.metrics; import java.util.Collection; import java.util.Collections; import java.util.concurrent.atomic.AtomicReference; +import org.apache.pinot.spi.metrics.NoopPinotMetricsRegistry; import org.apache.pinot.spi.metrics.PinotMetricsRegistry; import static org.apache.pinot.spi.utils.CommonConstants.Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS; @@ -33,22 +34,21 @@ import static org.apache.pinot.spi.utils.CommonConstants.Broker.DEFAULT_METRICS_ */ public class BrokerMetrics extends AbstractMetrics<BrokerQueryPhase, BrokerMeter, BrokerGauge, BrokerTimer> { - private static final AtomicReference<BrokerMetrics> BROKER_METRICS_INSTANCE = new AtomicReference<>(); + private static final BrokerMetrics NOOP = new BrokerMetrics(new NoopPinotMetricsRegistry()); + private static final AtomicReference<BrokerMetrics> BROKER_METRICS_INSTANCE = new AtomicReference<>(NOOP); /** * register the brokerMetrics onto this class, so that we don't need to pass it down as a parameter */ public static boolean register(BrokerMetrics brokerMetrics) { - return BROKER_METRICS_INSTANCE.compareAndSet(null, brokerMetrics); + return BROKER_METRICS_INSTANCE.compareAndSet(NOOP, brokerMetrics); } /** * should always call after registration */ public static BrokerMetrics get() { - BrokerMetrics ret = BROKER_METRICS_INSTANCE.get(); - assert ret != null; - return ret; + return BROKER_METRICS_INSTANCE.get(); } /** diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMetrics.java index d7e75ca54c..d40fd26174 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMetrics.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMetrics.java @@ -19,6 +19,7 @@ package org.apache.pinot.common.metrics; import java.util.concurrent.atomic.AtomicReference; +import org.apache.pinot.spi.metrics.NoopPinotMetricsRegistry; import org.apache.pinot.spi.metrics.PinotMetricsRegistry; import static org.apache.pinot.spi.utils.CommonConstants.Controller.DEFAULT_METRICS_PREFIX; @@ -29,10 +30,11 @@ import static org.apache.pinot.spi.utils.CommonConstants.Controller.DEFAULT_METR */ public class ControllerMetrics extends AbstractMetrics<AbstractMetrics.QueryPhase, ControllerMeter, ControllerGauge, ControllerTimer> { - private static final AtomicReference<ControllerMetrics> CONTROLLER_METRICS_INSTANCE = new AtomicReference<>(); + private static final ControllerMetrics NOOP = new ControllerMetrics(new NoopPinotMetricsRegistry()); + private static final AtomicReference<ControllerMetrics> CONTROLLER_METRICS_INSTANCE = new AtomicReference<>(NOOP); public static boolean register(ControllerMetrics controllerMetrics) { - return CONTROLLER_METRICS_INSTANCE.compareAndSet(null, controllerMetrics); + return CONTROLLER_METRICS_INSTANCE.compareAndSet(NOOP, controllerMetrics); } public static ControllerMetrics get() { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMetrics.java index d716523e9d..e0df446c6b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMetrics.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MinionMetrics.java @@ -19,15 +19,17 @@ package org.apache.pinot.common.metrics; import java.util.concurrent.atomic.AtomicReference; +import org.apache.pinot.spi.metrics.NoopPinotMetricsRegistry; import org.apache.pinot.spi.metrics.PinotMetricsRegistry; import org.apache.pinot.spi.utils.CommonConstants; public class MinionMetrics extends AbstractMetrics<MinionQueryPhase, MinionMeter, MinionGauge, MinionTimer> { - private static final AtomicReference<MinionMetrics> MINION_METRICS_INSTANCE = new AtomicReference<>(); + private static final MinionMetrics NOOP = new MinionMetrics(new NoopPinotMetricsRegistry()); + private static final AtomicReference<MinionMetrics> MINION_METRICS_INSTANCE = new AtomicReference<>(NOOP); public static boolean register(MinionMetrics minionMetrics) { - return MINION_METRICS_INSTANCE.compareAndSet(null, minionMetrics); + return MINION_METRICS_INSTANCE.compareAndSet(NOOP, minionMetrics); } public static MinionMetrics get() { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java index c2cb9ce872..7e253d0012 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.Collection; import java.util.Collections; import java.util.concurrent.atomic.AtomicReference; +import org.apache.pinot.spi.metrics.NoopPinotMetricsRegistry; import org.apache.pinot.spi.metrics.PinotMetricsRegistry; import static org.apache.pinot.spi.utils.CommonConstants.Server.DEFAULT_ENABLE_TABLE_LEVEL_METRICS; @@ -34,18 +35,20 @@ import static org.apache.pinot.spi.utils.CommonConstants.Server.DEFAULT_METRICS_ */ public class ServerMetrics extends AbstractMetrics<ServerQueryPhase, ServerMeter, ServerGauge, ServerTimer> { - private static final AtomicReference<ServerMetrics> SERVER_METRICS_INSTANCE = new AtomicReference<>(); + private static final ServerMetrics NOOP = new ServerMetrics(new NoopPinotMetricsRegistry()); + + private static final AtomicReference<ServerMetrics> SERVER_METRICS_INSTANCE = new AtomicReference<>(NOOP); /** * register the serverMetrics onto this class, so that we don't need to pass it down as a parameter */ public static boolean register(ServerMetrics serverMetrics) { - return SERVER_METRICS_INSTANCE.compareAndSet(null, serverMetrics); + return SERVER_METRICS_INSTANCE.compareAndSet(NOOP, serverMetrics); } @VisibleForTesting public static void deregister() { - SERVER_METRICS_INSTANCE.set(null); + SERVER_METRICS_INSTANCE.set(NOOP); } /** diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 9c53cdee6a..ec6c95fc97 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -291,7 +291,8 @@ public class QueryEnvironment { } private DispatchableSubPlan toDispatchableSubPlan(RelRoot relRoot, PlannerContext plannerContext, long requestId) { - SubPlan plan = PinotLogicalQueryPlanner.makePlan(relRoot); + PinotLogicalQueryPlanner logicalQueryPlanner = new PinotLogicalQueryPlanner(); + SubPlan plan = logicalQueryPlanner.makePlan(relRoot); PinotDispatchPlanner pinotDispatchPlanner = new PinotDispatchPlanner(plannerContext, _workerManager, requestId, _tableCache); return pinotDispatchPlanner.createDispatchableSubPlan(plan); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java index 9e7703b7ef..a37079ae35 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/PinotLogicalQueryPlanner.java @@ -29,25 +29,30 @@ import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; import org.apache.calcite.rel.logical.PinotRelExchangeType; +import org.apache.pinot.common.metrics.BrokerMeter; +import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.query.planner.PlanFragment; import org.apache.pinot.query.planner.SubPlan; import org.apache.pinot.query.planner.SubPlanMetadata; +import org.apache.pinot.query.planner.plannode.JoinNode; import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; import org.apache.pinot.query.planner.plannode.MailboxSendNode; import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.WindowNode; /** * PinotLogicalQueryPlanner walks top-down from {@link RelRoot} and construct a forest of trees with {@link PlanNode}. */ public class PinotLogicalQueryPlanner { - private PinotLogicalQueryPlanner() { - } + + private boolean _windowFunctionFound = false; + private boolean _joinFound = false; /** * Converts a Calcite {@link RelRoot} into a Pinot {@link SubPlan}. */ - public static SubPlan makePlan(RelRoot relRoot) { + public SubPlan makePlan(RelRoot relRoot) { PlanNode rootNode = relNodeToPlanNode(relRoot.rel); PlanFragment rootFragment = planNodeToPlanFragment(rootNode); return new SubPlan(rootFragment, @@ -79,8 +84,26 @@ public class PinotLogicalQueryPlanner { // return subPlanMap.get(0); } - private static PlanNode relNodeToPlanNode(RelNode node) { + private PlanNode relNodeToPlanNode(RelNode node) { PlanNode planNode = RelToPlanNodeConverter.toPlanNode(node, -1); + + if (planNode instanceof JoinNode) { + BrokerMetrics brokerMetrics = BrokerMetrics.get(); + brokerMetrics.addMeteredGlobalValue(BrokerMeter.JOIN_COUNT, 1); + if (!_joinFound) { + brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_WITH_JOINS, 1); + _joinFound = true; + } + } + if (planNode instanceof WindowNode) { + BrokerMetrics brokerMetrics = BrokerMetrics.get(); + brokerMetrics.addMeteredGlobalValue(BrokerMeter.WINDOW_COUNT, 1); + if (!_windowFunctionFound) { + brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_WITH_WINDOW, 1); + _windowFunctionFound = true; + } + } + List<RelNode> inputs = node.getInputs(); for (RelNode input : inputs) { planNode.addInput(relNodeToPlanNode(input)); @@ -88,7 +111,7 @@ public class PinotLogicalQueryPlanner { return planNode; } - private static PlanFragment planNodeToPlanFragment(PlanNode node) { + private PlanFragment planNodeToPlanFragment(PlanNode node) { PlanFragmenter fragmenter = new PlanFragmenter(); PlanFragmenter.Context fragmenterContext = fragmenter.createContext(); node = node.visit(fragmenter, fragmenterContext); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/metrics/PinotMetricsFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/metrics/PinotMetricsFactory.java index c78b429d2b..ec1dc9d706 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/metrics/PinotMetricsFactory.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/metrics/PinotMetricsFactory.java @@ -20,6 +20,7 @@ package org.apache.pinot.spi.annotations.metrics; import java.util.function.Function; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.metrics.NoopPinotMetricsRegistry; import org.apache.pinot.spi.metrics.PinotGauge; import org.apache.pinot.spi.metrics.PinotJmxReporter; import org.apache.pinot.spi.metrics.PinotMetricName; @@ -60,4 +61,37 @@ public interface PinotMetricsFactory { * Returns the name of metrics factory. */ String getMetricsFactoryName(); + + class Noop implements PinotMetricsFactory { + private final NoopPinotMetricsRegistry _registry = new NoopPinotMetricsRegistry(); + @Override + public void init(PinotConfiguration metricsConfiguration) { + } + + @Override + public PinotMetricsRegistry getPinotMetricsRegistry() { + return _registry; + } + + @Override + public PinotMetricName makePinotMetricName(Class<?> klass, String name) { + return () -> "noopMetricName"; + } + + @Override + public <T> PinotGauge<T> makePinotGauge(Function<Void, T> condition) { + return _registry.newGauge(); + } + + @Override + public PinotJmxReporter makePinotJmxReporter(PinotMetricsRegistry metricsRegistry) { + return () -> { + }; + } + + @Override + public String getMetricsFactoryName() { + return "noop"; + } + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/metrics/NoopPinotMetricsRegistry.java b/pinot-spi/src/main/java/org/apache/pinot/spi/metrics/NoopPinotMetricsRegistry.java new file mode 100644 index 0000000000..b7f80d9878 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/metrics/NoopPinotMetricsRegistry.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.metrics; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + + +public class NoopPinotMetricsRegistry implements PinotMetricsRegistry { + @Override + public void removeMetric(PinotMetricName name) { + } + + public <T> PinotGauge<T> newGauge() { + return new PinotGauge<T>() { + @Override + public T value() { + return null; + } + + @Override + public Object getGauge() { + return null; + } + + @Override + public Object getMetric() { + return null; + } + + @Override + public void setValue(T value) { + } + + @Override + public void setValueSupplier(Supplier<T> valueSupplier) { + } + }; + } + + @Override + public <T> PinotGauge<T> newGauge(PinotMetricName name, PinotGauge<T> gauge) { + return newGauge(); + } + + @Override + public PinotMeter newMeter(PinotMetricName name, String eventType, TimeUnit unit) { + return new PinotMeter() { + @Override + public void mark() { + } + + @Override + public void mark(long unitCount) { + } + + @Override + public long count() { + return 0; + } + + @Override + public Object getMetered() { + return null; + } + + @Override + public TimeUnit rateUnit() { + return null; + } + + @Override + public String eventType() { + return ""; + } + + @Override + public double fifteenMinuteRate() { + return 0; + } + + @Override + public double fiveMinuteRate() { + return 0; + } + + @Override + public double meanRate() { + return 0; + } + + @Override + public double oneMinuteRate() { + return 0; + } + + @Override + public Object getMetric() { + return null; + } + }; + } + + @Override + public PinotCounter newCounter(PinotMetricName name) { + return new PinotCounter() { + @Override + public Object getCounter() { + return null; + } + + @Override + public Object getMetric() { + return null; + } + }; + } + + @Override + public PinotTimer newTimer(PinotMetricName name, TimeUnit durationUnit, TimeUnit rateUnit) { + return new PinotTimer() { + @Override + public void update(long duration, TimeUnit unit) { + } + + @Override + public Object getTimer() { + return null; + } + + @Override + public Object getMetered() { + return null; + } + + @Override + public TimeUnit rateUnit() { + return null; + } + + @Override + public String eventType() { + return ""; + } + + @Override + public long count() { + return 0; + } + + @Override + public double fifteenMinuteRate() { + return 0; + } + + @Override + public double fiveMinuteRate() { + return 0; + } + + @Override + public double meanRate() { + return 0; + } + + @Override + public double oneMinuteRate() { + return 0; + } + + @Override + public Object getMetric() { + return null; + } + }; + } + + @Override + public PinotHistogram newHistogram(PinotMetricName name, boolean biased) { + return new PinotHistogram() { + @Override + public Object getHistogram() { + return null; + } + + @Override + public Object getMetric() { + return null; + } + }; + } + + @Override + public Map<PinotMetricName, PinotMetric> allMetrics() { + return Collections.emptyMap(); + } + + @Override + public void addListener(PinotMetricsRegistryListener listener) { + } + + @Override + public Object getMetricsRegistry() { + return new Object(); + } + + @Override + public void shutdown() { + } +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/metrics/PinotMetricUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/metrics/PinotMetricUtils.java index 49fcfb7e9b..93ac36e07c 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/metrics/PinotMetricUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/metrics/PinotMetricUtils.java @@ -51,7 +51,8 @@ public class PinotMetricUtils { private static final Map<MetricsRegistryRegistrationListener, Boolean> METRICS_REGISTRY_REGISTRATION_LISTENERS_MAP = new ConcurrentHashMap<>(); - private static PinotMetricsFactory _pinotMetricsFactory = null; + private static final PinotMetricsFactory NOOP_FACTORY = new PinotMetricsFactory.Noop(); + private static PinotMetricsFactory _pinotMetricsFactory = NOOP_FACTORY; /** * Initialize the metricsFactory ad registers the metricsRegistry @@ -98,7 +99,7 @@ public class PinotMetricUtils { } ); - Preconditions.checkState(_pinotMetricsFactory != null, + Preconditions.checkState(_pinotMetricsFactory != NOOP_FACTORY, "Failed to initialize PinotMetricsFactory. Please check if any pinot-metrics related jar is actually added to" + " the classpath."); } @@ -194,9 +195,11 @@ public class PinotMetricUtils { */ @VisibleForTesting public static void cleanUp() { - if (_pinotMetricsFactory != null) { + if (_pinotMetricsFactory == null) { + _pinotMetricsFactory = NOOP_FACTORY; + } else if (_pinotMetricsFactory != NOOP_FACTORY) { _pinotMetricsFactory.getPinotMetricsRegistry().shutdown(); - _pinotMetricsFactory = null; + _pinotMetricsFactory = NOOP_FACTORY; } } @@ -206,7 +209,7 @@ public class PinotMetricUtils { * @param metricsConfiguration metrics configs */ public static synchronized PinotMetricsRegistry getPinotMetricsRegistry(PinotConfiguration metricsConfiguration) { - if (_pinotMetricsFactory == null) { + if (_pinotMetricsFactory == null || _pinotMetricsFactory == NOOP_FACTORY) { init(metricsConfiguration); } return _pinotMetricsFactory.getPinotMetricsRegistry(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org