gortiz commented on code in PR #12704:
URL: https://github.com/apache/pinot/pull/12704#discussion_r1565423170


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java:
##########
@@ -19,107 +19,295 @@
 package org.apache.pinot.query.runtime.operator;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import java.io.DataInput;
+import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
+import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerOperator;
 import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.apache.pinot.spi.trace.InvocationScope;
 import org.apache.pinot.spi.trace.Tracing;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
-public abstract class MultiStageOperator implements 
Operator<TransferableBlock>, AutoCloseable {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(MultiStageOperator.class);
+public abstract class MultiStageOperator<K extends Enum<K> & StatMap.Key>
+    implements Operator<TransferableBlock>, AutoCloseable {
 
   protected final OpChainExecutionContext _context;
   protected final String _operatorId;
-  protected final OpChainStats _opChainStats;
+  protected final StatMap<K> _statMap;
   protected boolean _isEarlyTerminated;
 
-  public MultiStageOperator(OpChainExecutionContext context) {
+  public MultiStageOperator(OpChainExecutionContext context, Class<K> 
keyStatClass) {
     _context = context;
     _operatorId = Joiner.on("_").join(getClass().getSimpleName(), 
_context.getStageId(), _context.getServer());
-    _opChainStats = _context.getStats();
     _isEarlyTerminated = false;
+    _statMap = new StatMap<>(keyStatClass);
   }
 
+  protected abstract Logger logger();
+
+  public abstract Type getOperatorType();
+
+  public abstract K getExecutionTimeKey();
+
+  public abstract K getEmittedRowsKey();
+
   @Override
   public TransferableBlock nextBlock() {
     if (Tracing.ThreadAccountantOps.isInterrupted()) {
       throw new EarlyTerminationException("Interrupted while processing next 
block");
     }
+    if (logger().isDebugEnabled()) {
+      logger().debug("Operator {}: Reading next block", _operatorId);
+    }
     try (InvocationScope ignored = 
Tracing.getTracer().createScope(getClass())) {
       TransferableBlock nextBlock;
       if (shouldCollectStats()) {
-        OperatorStats operatorStats = _opChainStats.getOperatorStats(_context, 
_operatorId);
-        operatorStats.startTimer();
+        Stopwatch executeStopwatch = Stopwatch.createStarted();

Review Comment:
   We discussed on whether we want to always collect time stats or not. Right 
now this PR is not modifying that, but it should be as easy as to always return 
true in `shouldCollectStats` or just remove this if



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to