gortiz commented on code in PR #12704:
URL: https://github.com/apache/pinot/pull/12704#discussion_r1565545708


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/MultiStageQueryStats.java:
##########
@@ -0,0 +1,668 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.avro.util.ByteBufferInputStream;
+import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.query.runtime.operator.BaseMailboxReceiveOperator;
+import 
org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator;
+import org.apache.pinot.query.runtime.operator.LiteralValueOperator;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.MultiStageOperator;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The stats of a given query.
+ * <p>
+ * For the same query, multiple instances of this class may exist. Each of 
them will have a partial view of the stats.
+ * Specifically, while the query is being executed, each operator will return 
its own partial view of the stats when
+ * EOS block is sent.
+ * <p>
+ * Simple operations with a single upstream, like filters or transforms, would 
just add their own information to the
+ * stats. More complex operations, like joins or receiving mailboxes, will 
merge the stats from all their upstreams and
+ * add their own stats.
+ * <p>
+ * The complete stats for the query are obtained in the execution root 
(usually the broker) by merging the partial
+ * views.
+ * <p>
+ * In order to reduce allocation, this class is mutable. Some operators may 
create their own stats, but most of them
+ * will receive a stats object from the upstream operator and modify it by 
adding their own stats and sometimes merging
+ * them with other upstream stats.
+ */
+public class MultiStageQueryStats {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiStageQueryStats.class);
+  private final int _currentStageId;
+  private final StageStats.Open _currentStats;
+  /**
+   * Known stats for stages whose id is higher than the current one.
+   * <p>
+   * A stage may not know all the stats whose id is higher than the current 
one, so this list may contain null values.
+   * It may also grow in size when different merge methods are called.
+   * <p>
+   * For example the stats of the left hand side of a join may know stats of 
stages 3 and 4 and the right side may know
+   * stats of stages 5. When merging the stats of the join, the stats of 
stages 5 will be added to this list.
+   *
+   * @see #mergeUpstream(List)
+   * @see #mergeUpstream(MultiStageQueryStats)
+   * @see #mergeInOrder(MultiStageQueryStats, MultiStageOperator.Type, StatMap)
+   */
+  private final ArrayList<StageStats.Closed> _closedStats;
+  private static final MultiStageOperator.Type[] ALL_TYPES = 
MultiStageOperator.Type.values();
+
+  private MultiStageQueryStats(int stageId) {
+    _currentStageId = stageId;
+    _currentStats = new StageStats.Open();
+    _closedStats = new ArrayList<>();
+  }
+
+  private static MultiStageQueryStats create(int stageId, 
MultiStageOperator.Type type, @Nullable StatMap<?> opStats) {
+    MultiStageQueryStats multiStageQueryStats = new 
MultiStageQueryStats(stageId);
+    multiStageQueryStats.getCurrentStats().addLastOperator(type, opStats);
+    return multiStageQueryStats;
+  }
+
+  public static MultiStageQueryStats emptyStats(int stageId) {
+    return new MultiStageQueryStats(stageId);
+  }
+
+  public static MultiStageQueryStats createLeaf(int stageId,
+      StatMap<LeafStageTransferableBlockOperator.StatKey> opStats) {
+    return create(stageId, MultiStageOperator.Type.LEAF, opStats);
+  }
+
+  public static MultiStageQueryStats createLiteral(int stageId, 
StatMap<LiteralValueOperator.StatKey> statMap) {
+    return create(stageId, MultiStageOperator.Type.LITERAL, statMap);
+  }
+
+  public static MultiStageQueryStats createCancelledSend(int stageId,
+      StatMap<MailboxSendOperator.StatKey> statMap) {
+    return create(stageId, MultiStageOperator.Type.MAILBOX_SEND, statMap);
+  }
+
+  public static MultiStageQueryStats createReceive(int stageId, 
StatMap<BaseMailboxReceiveOperator.StatKey> stats) {
+    return create(stageId, MultiStageOperator.Type.MAILBOX_RECEIVE, stats);
+  }
+
+  public int getCurrentStageId() {
+    return _currentStageId;
+  }
+
+  /**
+   * Serialize the current stats in a way it is compatible with {@link 
#mergeUpstream(List)}.
+   * <p>
+   * The serialized stats are returned in a list where the index is the stage 
id. Stages downstream or not related to
+   * the current one will be null.
+   */
+  public List<ByteBuffer> serialize()
+      throws IOException {
+
+    ArrayList<ByteBuffer> serializedStats = new ArrayList<>(getMaxStageId());
+    for (int i = 0; i < _currentStageId; i++) {
+      serializedStats.add(null);
+    }
+
+    try (UnsynchronizedByteArrayOutputStream baos = new 
UnsynchronizedByteArrayOutputStream.Builder().get();
+        DataOutputStream output = new DataOutputStream(baos)) {
+
+      _currentStats.serialize(output);
+      ByteBuffer currentBuf = ByteBuffer.wrap(baos.toByteArray());
+
+      serializedStats.add(currentBuf);
+
+      for (StageStats.Closed closedStats : _closedStats) {
+        if (closedStats == null) {
+          serializedStats.add(null);
+          continue;
+        }
+        baos.reset();
+        closedStats.serialize(output);
+        ByteBuffer buf = ByteBuffer.wrap(baos.toByteArray());
+        serializedStats.add(buf);
+      }
+    }
+    Preconditions.checkState(serializedStats.size() == getMaxStageId() + 1,
+        "Serialized stats size is different from expected size. Expected %s, 
got %s",
+        getMaxStageId() + 1, serializedStats.size());
+    return serializedStats;
+  }
+
+  public StageStats.Open getCurrentStats() {
+    return _currentStats;
+  }
+
+  /**
+   * Returns the higher stage id known by this object.
+   */
+  public int getMaxStageId() {
+    return _currentStageId + _closedStats.size();
+  }
+
+  /**
+   * Get the stats of a stage whose id is higher than the current one.
+   * <p>
+   * This method returns null in case the stage id is unknown by this stage or 
no stats are stored for it.
+   */
+  @Nullable
+  public StageStats.Closed getUpstreamStageStats(int stageId) {
+    if (stageId <= _currentStageId) {
+      throw new IllegalArgumentException("Stage " + stageId + " cannot be 
upstream of current stage "
+          + _currentStageId);
+    }
+
+    int index = stageId - _currentStageId - 1;
+    if (index >= _closedStats.size()) {
+      return null;
+    }
+    return _closedStats.get(index);
+  }
+
+  public void mergeInOrder(MultiStageQueryStats otherStats, 
MultiStageOperator.Type type,
+      StatMap<?> statMap) {
+    Preconditions.checkArgument(_currentStageId == otherStats._currentStageId,
+        "Cannot merge stats from different stages (%s and %s)", 
_currentStageId, otherStats._currentStageId);
+    mergeUpstream(otherStats);
+    StageStats.Open currentStats = getCurrentStats();
+    currentStats.concat(otherStats.getCurrentStats());
+    currentStats.addLastOperator(type, statMap);
+  }
+
+  private void growUpToStage(int stageId) {
+    _closedStats.ensureCapacity(stageId - _currentStageId);
+    while (getMaxStageId() < stageId) {
+      _closedStats.add(null);
+    }
+  }
+
+  /**
+   * Merge upstream stats from another MultiStageQueryStats object into this 
one.
+   * <p>
+   * Only the stages whose id is higher than the current one are merged. The 
reason to do so is that upstream stats
+   * should be already closed while current stage may need some extra tuning.
+   * <p>
+   * For example set operations may need to merge the stats from all its 
upstreams before concatenating stats of the
+   * current stage.
+   */
+  public void mergeUpstream(MultiStageQueryStats otherStats) {
+    Preconditions.checkArgument(_currentStageId <= otherStats._currentStageId,
+        "Cannot merge stats from early stage %s into stats of later stage %s",
+        otherStats._currentStageId, _currentStageId);
+
+    growUpToStage(otherStats.getMaxStageId());
+
+    int currentDiff = otherStats._currentStageId - _currentStageId;
+    if (currentDiff > 0) {
+      StageStats.Closed close = otherStats._currentStats.close();
+      int selfIdx = currentDiff - 1;
+      StageStats.Closed myStats = _closedStats.get(selfIdx);
+      if (myStats == null) {
+        _closedStats.set(selfIdx, close);
+      } else {
+        myStats.merge(close);
+      }
+    }
+
+    for (int i = 0; i < otherStats._closedStats.size(); i++) {
+      StageStats.Closed otherStatsForStage = otherStats._closedStats.get(i);
+      if (otherStatsForStage == null) {
+        continue;
+      }
+      int selfIdx = i + currentDiff;
+      StageStats.Closed myStats = _closedStats.get(selfIdx);
+      try {
+        if (myStats == null) {
+          _closedStats.set(selfIdx, otherStatsForStage);
+          assert getUpstreamStageStats(i + otherStats._currentStageId + 1) == 
otherStatsForStage;
+        } else {
+          myStats.merge(otherStatsForStage);
+        }
+      } catch (IllegalArgumentException | IllegalStateException ex) {
+        LOGGER.warn("Error merging stats on stage " + i + ". Ignoring the new 
stats", ex);
+      }
+    }
+  }
+
+  public void mergeUpstream(List<ByteBuffer> otherStats) {
+    for (int i = 0; i <= _currentStageId && i < otherStats.size(); i++) {
+      if (otherStats.get(i) != null) {
+        throw new IllegalArgumentException("Cannot merge stats from early 
stage " + i + " into stats of "
+            + "later stage " + _currentStageId);
+      }
+    }
+    growUpToStage(otherStats.size() - 1);
+
+    for (int i = _currentStageId + 1; i < otherStats.size(); i++) {
+      ByteBuffer otherBuf = otherStats.get(i);
+      if (otherBuf != null) {
+        StageStats.Closed myStats = getUpstreamStageStats(i);
+        try (InputStream is = new 
ByteBufferInputStream(Collections.singletonList(otherBuf));
+            DataInputStream dis = new DataInputStream(is)) {
+          if (myStats == null) {
+            StageStats.Closed deserialized = 
StageStats.Closed.deserialize(dis);
+            _closedStats.set(i - _currentStageId - 1, deserialized);
+            assert getUpstreamStageStats(i) == deserialized;
+          } else {
+            myStats.merge(dis);
+          }
+        } catch (IOException ex) {
+          boolean assertOn = false;
+          // *assigns* true if assertions are on.
+          //CHECKSTYLE:OFF
+          assert assertOn = true;
+          if (assertOn) {
+            throw new UncheckedIOException("Error deserializing stats on stage 
" + i, ex);
+          }
+          //CHECKSTYLE:ON

Review Comment:
   This is the _standard_ way to detect whether asserts are enabled or not. 
Maybe we should create a static class that does the same thing, specially given 
checkstyle doesn't like this code style



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to