This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-27981
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit a370bf6e63f4594332b690f727649be014f0bf8e
Author: AMashenkov <[email protected]>
AuthorDate: Wed Feb 25 15:26:48 2026 +0300

    wip
---
 .../internal/sql/engine/exec/ExecutionContext.java |  3 +
 .../sql/engine/exec/ExecutionServiceImpl.java      |  1 +
 .../sql/engine/exec/LogicalRelImplementor.java     |  1 +
 .../internal/sql/engine/exec/rel/AbstractNode.java | 46 ++++++++++++++
 .../rel/AbstractRightMaterializedJoinNode.java     | 28 +++++++++
 .../sql/engine/exec/rel/AbstractSetOpNode.java     |  4 ++
 .../sql/engine/exec/rel/AsyncRootNode.java         | 70 +++++++++++++++++++++-
 .../exec/rel/CorrelatedNestedLoopJoinNode.java     | 28 +++++++++
 .../internal/sql/engine/exec/rel/FilterNode.java   | 19 ++++++
 .../sql/engine/exec/rel/HashAggregateNode.java     |  4 ++
 .../ignite/internal/sql/engine/exec/rel/Inbox.java | 64 ++++++++++++++++++++
 .../sql/engine/exec/rel/IndexScanNode.java         |  7 +++
 .../sql/engine/exec/rel/IndexSpoolNode.java        | 16 +++++
 .../internal/sql/engine/exec/rel/LimitNode.java    | 30 ++++++++--
 .../sql/engine/exec/rel/MergeJoinNode.java         | 28 +++++++++
 .../sql/engine/exec/rel/MetricsAwareNode.java      | 46 ++++++++++++++
 .../internal/sql/engine/exec/rel/ModifyNode.java   |  4 ++
 .../ignite/internal/sql/engine/exec/rel/Node.java  |  2 +-
 .../internal/sql/engine/exec/rel/Outbox.java       | 54 +++++++++++++++++
 .../internal/sql/engine/exec/rel/ProjectNode.java  |  4 ++
 .../internal/sql/engine/exec/rel/RootNode.java     |  2 +
 .../internal/sql/engine/exec/rel/ScanNode.java     |  2 +
 .../sql/engine/exec/rel/SortAggregateNode.java     |  4 ++
 .../internal/sql/engine/exec/rel/SortNode.java     |  4 ++
 .../sql/engine/exec/rel/StorageScanNode.java       | 55 ++++++++++++++++-
 .../sql/engine/exec/rel/TableScanNode.java         | 13 ++++
 .../sql/engine/exec/rel/TableSpoolNode.java        |  4 ++
 .../internal/sql/engine/exec/rel/UnionAllNode.java |  4 ++
 .../sql/engine/exec/rel/AsyncRootNodeTest.java     |  6 +-
 .../sql/engine/exec/rel/ExchangeExecutionTest.java |  6 +-
 .../sql/engine/exec/rel/ExecutionTest.java         |  5 ++
 .../exec/rel/TableScanNodeExecutionTest.java       |  8 ++-
 32 files changed, 554 insertions(+), 18 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index 5716c1fbaa4..bc6eaac3b3c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -37,6 +37,7 @@ import org.apache.calcite.linq4j.QueryProvider;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
 import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
 import org.apache.ignite.internal.lang.RunnableX;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -65,6 +66,8 @@ import org.jetbrains.annotations.Nullable;
 public class ExecutionContext<RowT> implements SqlEvaluationContext<RowT> {
     private static final IgniteLogger LOG = 
Loggers.forClass(ExecutionContext.class);
 
+    public static final boolean DUMP_METRICS = 
IgniteSystemProperties.getBoolean("IGNITE_DUMP_QUERY_METRICS_TO_LOGS", false);
+
     /**
      * TODO: https://issues.apache.org/jira/browse/IGNITE-15276 Support other 
locales.
      */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index ce1e20f4a9c..35fbc350053 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -1054,6 +1054,7 @@ public class ExecutionServiceImpl<RowT> implements 
ExecutionService, LogicalTopo
                 );
 
                 AsyncRootNode<RowT, InternalSqlRow> rootNode = new 
AsyncRootNode<>(
+                        ectx,
                         node,
                         inRow -> new InternalSqlRowImpl<>(inRow, 
ectx.rowAccessor(), internalTypeConverter));
                 node.onRegister(rootNode);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index dfd117016ca..3d53320e8f7 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -575,6 +575,7 @@ public class LogicalRelImplementor<RowT> implements 
IgniteRelVisitor<Node<RowT>>
         return new TableScanNode<>(
                 ctx,
                 rowFactory,
+                tbl,
                 scannableTable,
                 partitionProvider,
                 filters,
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
index e810e1c20dd..9c4b535fe10 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
@@ -62,6 +62,11 @@ public abstract class AbstractNode<RowT> implements 
Node<RowT> {
 
     private List<Node<RowT>> sources;
 
+    // Metrics
+    protected int requestCount = 0;
+    protected int rewindCount = 0;
+    protected long receivedRowsCount = 0L;
+
     /**
      * Constructor.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -112,6 +117,8 @@ public abstract class AbstractNode<RowT> implements 
Node<RowT> {
     /** {@inheritDoc} */
     @Override
     public void rewind() {
+        onRewind();
+
         rewindInternal();
 
         if (!nullOrEmpty(sources())) {
@@ -219,4 +226,43 @@ public abstract class AbstractNode<RowT> implements 
Node<RowT> {
             Debuggable.dumpState(writer, Debuggable.childIndentation(indent), 
sources);
         }
     }
+
+    @Override
+    public void dumpNodeMetrics(IgniteStringBuilder writer, String indent) {
+        writer.app(indent);
+        dumpMetrics0(writer);
+        writer.nl();
+
+        MetricsAwareNode.dumpChildNodesMetrics(writer, indent, sources);
+    }
+
+    protected void dumpMetrics0(IgniteStringBuilder writer) {
+        writer.app(this.getClass().getSimpleName()).app(": ");
+
+        writer.app("receivedRows=").app(receivedRowsCount);
+
+        if (requestCount > 0) {
+            writer.app(", requests=").app(requestCount);
+        }
+
+        if (rewindCount > 0) {
+            writer.app(", rewinds=").app(rewindCount);
+        }
+    }
+
+    protected final void onRequestReceived() {
+        requestCount++;
+    }
+
+    protected final void onRowReceived() {
+        receivedRowsCount++;
+    }
+
+    protected final void onRowsReceived(long rowsCount) {
+        receivedRowsCount += rowsCount;
+    }
+
+    protected final void onRewind() {
+        rewindCount++;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java
index 1e81985add5..0a729417df6 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java
@@ -34,6 +34,10 @@ public abstract class 
AbstractRightMaterializedJoinNode<RowT> extends AbstractNo
     final Deque<RowT> leftInBuf = new ArrayDeque<>(inBufSize);
     protected @Nullable RowT left;
 
+    // Metrics
+    private long receivedRowsFromLeft = 0L;
+    private long receiveRowsFromRight = 0L;
+
     AbstractRightMaterializedJoinNode(ExecutionContext<RowT> ctx) {
         super(ctx);
     }
@@ -44,6 +48,8 @@ public abstract class AbstractRightMaterializedJoinNode<RowT> 
extends AbstractNo
         assert !nullOrEmpty(sources()) && sources().size() == 2;
         assert rowsCnt > 0 && requested == 0;
 
+        onRequestReceived();
+
         requested = rowsCnt;
 
         if (!inLoop) {
@@ -70,6 +76,8 @@ public abstract class AbstractRightMaterializedJoinNode<RowT> 
extends AbstractNo
                 /** {@inheritDoc} */
                 @Override
                 public void push(RowT row) throws Exception {
+                    onRowReceivedFromLeft();
+
                     pushLeft(row);
                 }
 
@@ -90,6 +98,8 @@ public abstract class AbstractRightMaterializedJoinNode<RowT> 
extends AbstractNo
                 /** {@inheritDoc} */
                 @Override
                 public void push(RowT row) throws Exception {
+                    onRowReceivedFromRight();
+
                     pushRight(row);
                 }
 
@@ -158,4 +168,22 @@ public abstract class 
AbstractRightMaterializedJoinNode<RowT> extends AbstractNo
     protected abstract void join() throws Exception;
 
     protected abstract void pushRight(RowT row) throws Exception;
+
+    @Override
+    protected void dumpMetrics0(IgniteStringBuilder writer) {
+        // Calculate aggregated statistics.
+        onRowsReceived(receivedRowsFromLeft + receiveRowsFromRight);
+
+        super.dumpMetrics0(writer);
+        writer.app(", leftRows=").app(receivedRowsFromLeft)
+                .app(", rightRows=").app(receiveRowsFromRight);
+    }
+
+    private void onRowReceivedFromLeft() {
+        receivedRowsFromLeft++;
+    }
+
+    private void onRowReceivedFromRight() {
+        receiveRowsFromRight++;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java
index c436f4fd999..79ea3281783 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java
@@ -65,6 +65,8 @@ public abstract class AbstractSetOpNode<RowT> extends 
AbstractNode<RowT> {
         assert rowsCnt > 0 && requested == 0;
         assert waiting <= 0;
 
+        onRequestReceived();
+
         requested = rowsCnt;
 
         if (waiting == 0) {
@@ -132,6 +134,8 @@ public abstract class AbstractSetOpNode<RowT> extends 
AbstractNode<RowT> {
         return new Downstream<>() {
             @Override
             public void push(RowT row) throws Exception {
+                onRowReceived();
+
                 AbstractSetOpNode.this.push(row, idx);
             }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
index dcc33bdd1ba..23a5048996d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
@@ -24,12 +24,18 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import org.apache.ignite.internal.lang.Debuggable;
+import org.apache.ignite.internal.lang.IgniteStringBuilder;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.sql.engine.QueryCancelledException;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.util.AsyncCursor;
 import org.apache.ignite.lang.CursorClosedException;
 import org.jetbrains.annotations.Nullable;
@@ -38,6 +44,7 @@ import org.jetbrains.annotations.Nullable;
  * An async iterator over the execution tree.
  */
 public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, 
AsyncCursor<OutRowT> {
+    public static final IgniteLogger LOGGER = 
Loggers.forClass(AsyncRootNode.class);
     private final CompletableFuture<Void> cancelFut = new 
CompletableFuture<>();
 
     /**
@@ -69,15 +76,25 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
      */
     private int waiting;
 
+    // Metrics
+    private long rowsReceived;
+    private long queryStartTime = -1L;
+    private long prefetchTime = -1L;
+    private long queryTime = -1L;
+    private final long fragmentId;
+    private final UUID queryId;
+
     /**
      * Constructor.
      *
      * @param source A source to requests rows from.
      * @param converter A converter to convert rows from an internal format to 
desired output format.
      */
-    public AsyncRootNode(AbstractNode<InRowT> source, Function<InRowT, 
OutRowT> converter) {
+    public AsyncRootNode(ExecutionContext<InRowT> ctx, AbstractNode<InRowT> 
source, Function<InRowT, OutRowT> converter) {
         this.source = source;
         this.converter = converter;
+        queryId = ctx.queryId();
+        fragmentId = ctx.description().fragmentId();
     }
 
     /** {@inheritDoc} */
@@ -85,6 +102,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
     public void push(InRowT row) throws Exception {
         assert waiting > 0 : waiting;
 
+        onRowReceived();
+
         buff.add(converter.apply(row));
 
         if (--waiting == 0) {
@@ -104,6 +123,10 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
         completePrefetchFuture(null);
 
         flush();
+
+        if (ExecutionContext.DUMP_METRICS) {
+            dumpQueryMetrics();
+        }
     }
 
     /** {@inheritDoc} */
@@ -199,6 +222,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
     public CompletableFuture<Void> startPrefetch() {
         assert source.context().description().prefetch();
 
+        onQueryStarted();
+
         if (waiting == 0) {
             try {
                 source.checkState();
@@ -259,6 +284,7 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
             } else if (waiting == NOT_WAITING) {
                 assert hasMore == HasMore.NO : hasMore;
 
+                onQueryFinish();
                 closeAsync();
             }
         } else if (!pendingRequests.isEmpty()) {
@@ -272,6 +298,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
     private void scheduleTask() {
         if (!pendingRequests.isEmpty() && taskScheduled.compareAndSet(false, 
true)) {
             source.execute(() -> {
+                onQueryStarted();
+
                 taskScheduled.set(false);
 
                 flush();
@@ -286,6 +314,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
      */
     private void completePrefetchFuture(@Nullable Throwable ex) {
         if (!prefetchFut.isDone()) {
+            onPrefetchFinished();
+
             if (ex != null) {
                 prefetchFut.completeExceptionally(ex);
             } else {
@@ -320,4 +350,42 @@ public class AsyncRootNode<InRowT, OutRowT> implements 
Downstream<InRowT>, Async
     private enum HasMore {
         YES, NO, UNCERTAIN
     }
+
+    private void dumpQueryMetrics() {
+        IgniteStringBuilder sb = new IgniteStringBuilder();
+        sb.app("Dump metrics for executed query: 
queryId=").app(queryId).app(", fragmentId=").app(fragmentId).nl();
+        sb.app("RootNode: rows=").app(rowsReceived)
+                .app(", 
prefetch=").app(MetricsAwareNode.beautifyNanoTime(prefetchTime))
+                .app(", 
totalTime=").app(MetricsAwareNode.beautifyNanoTime((queryTime)))
+                .nl();
+
+        MetricsAwareNode.dumpChildNodesMetrics(sb, 
Debuggable.childIndentation(""), List.of(source));
+
+        LOGGER.info(sb.toString());
+    }
+
+    private void onRowReceived() {
+        rowsReceived++;
+    }
+
+    private void onQueryStarted() {
+        if (queryStartTime == -1) {
+            queryStartTime = System.nanoTime();
+        }
+    }
+
+    private void onPrefetchFinished() {
+        if (prefetchTime == -1) {
+            prefetchTime = System.nanoTime() - queryStartTime;
+        }
+    }
+
+    private void onQueryFinish() {
+        if (queryStartTime != -1) {
+            onPrefetchFinished();
+
+            queryTime += System.nanoTime() - queryStartTime;
+            queryStartTime = -1;
+        }
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
index ecc5bbabe81..4fb0ac4d1f7 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
@@ -74,6 +74,10 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends 
AbstractNode<RowT> {
         INITIAL, FILLING_LEFT, FILLING_RIGHT, IDLE, IN_LOOP, END
     }
 
+    // Metrics
+    private long receivedRowsFromLeft = 0L;
+    private long receiveRowsFromRight = 0L;
+
     /**
      * Creates CorrelatedNestedLoopJoin node.
      *
@@ -114,6 +118,8 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends 
AbstractNode<RowT> {
         assert !nullOrEmpty(sources()) && sources().size() == 2;
         assert rowsCnt > 0 && requested == 0;
 
+        onRequestReceived();
+
         requested = rowsCnt;
 
         onRequest();
@@ -196,6 +202,8 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends 
AbstractNode<RowT> {
         assert downstream() != null;
         assert waitingLeft > 0;
 
+        onRowReceivedFromLeft();
+
         waitingLeft--;
 
         if (leftInBuf == null) {
@@ -211,6 +219,8 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends 
AbstractNode<RowT> {
         assert downstream() != null;
         assert waitingRight > 0;
 
+        onRowReceivedFromRight();
+
         waitingRight--;
 
         if (rightInBuf == null) {
@@ -488,4 +498,22 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends 
AbstractNode<RowT> {
             context().correlatedVariable(row, correlationIds.get(i).getId());
         }
     }
+
+    @Override
+    protected void dumpMetrics0(IgniteStringBuilder writer) {
+        // Calculate aggregated statistics.
+        onRowsReceived(receivedRowsFromLeft + receiveRowsFromRight);
+
+        super.dumpMetrics0(writer);
+        writer.app(", leftRows=").app(receivedRowsFromLeft)
+                .app(", rightRows=").app(receiveRowsFromRight);
+    }
+
+    private void onRowReceivedFromLeft() {
+        receivedRowsFromLeft++;
+    }
+
+    private void onRowReceivedFromRight() {
+        receiveRowsFromRight++;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/FilterNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/FilterNode.java
index ce26c1beaf3..21e0246fc89 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/FilterNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/FilterNode.java
@@ -40,6 +40,9 @@ public class FilterNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
 
     private boolean inLoop;
 
+    // Metrics
+    private long filteredRows;
+
     /**
      * Constructor.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -59,6 +62,8 @@ public class FilterNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
         assert !nullOrEmpty(sources()) && sources().size() == 1;
         assert rowsCnt > 0 && requested == 0;
 
+        onRequestReceived();
+
         requested = rowsCnt;
 
         if (!inLoop) {
@@ -72,10 +77,14 @@ public class FilterNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
         assert downstream() != null;
         assert waiting > 0;
 
+        onRowReceived();
+
         waiting--;
 
         if (pred.test(row)) {
             inBuf.add(row);
+        } else {
+            onRowFiltered();
         }
 
         filter();
@@ -147,4 +156,14 @@ public class FilterNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
                 .app(", requested=").app(requested)
                 .app(", waiting=").app(waiting);
     }
+
+    @Override
+    protected void dumpMetrics0(IgniteStringBuilder writer) {
+        super.dumpMetrics0(writer);
+        writer.app(", filteredRows=").app(filteredRows);
+    }
+
+    private void onRowFiltered() {
+        filteredRows++;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
index 0373336600b..ac608616065 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
@@ -102,6 +102,8 @@ public class HashAggregateNode<RowT> extends 
AbstractNode<RowT> implements Singl
         assert rowsCnt > 0 && requested == 0;
         assert waiting <= 0;
 
+        onRequestReceived();
+
         requested = rowsCnt;
 
         if (waiting == 0) {
@@ -117,6 +119,8 @@ public class HashAggregateNode<RowT> extends 
AbstractNode<RowT> implements Singl
         assert downstream() != null;
         assert waiting > 0;
 
+        onRowReceived();
+
         waiting--;
 
         for (Grouping grouping : groupings) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
index 5c2534de4f1..ab6331cb2b6 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
 import static org.apache.calcite.util.Util.unexpected;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
+import it.unimi.dsi.fastutil.ints.Int2LongArrayMap;
+import it.unimi.dsi.fastutil.ints.Int2LongMap;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
@@ -116,6 +118,8 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
     public void request(int rowsCnt) throws Exception {
         assert rowsCnt > 0 && requested == 0;
 
+        onRequestReceived();
+
         requested = rowsCnt;
 
         if (!inLoop) {
@@ -542,6 +546,14 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
          */
         private @Nullable SharedState sharedStateHolder = null;
 
+        // Metrics
+        Int2LongMap batchTimestamps = new Int2LongArrayMap(IO_BATCH_CNT);
+        long rowsReceived = 0L;
+        long batchesRequested = 0L;
+        long requestTotalTime = 0L;
+        long requestMinTime = Long.MAX_VALUE;
+        long requestMaxTime = 0L;
+
         private RemoteSource(BatchRequester batchRequester) {
             this.batchRequester = batchRequester;
         }
@@ -555,6 +567,7 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
         void reset(SharedState state) {
             sharedStateHolder = state;
             batches.clear();
+            batchTimestamps.clear();
 
             this.lastEnqueued = lastRequested;
             this.state = State.WAITING;
@@ -570,6 +583,8 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
                 return;
             }
 
+            batchReceived(id, rows.size());
+
             batches.offer(new Batch<>(id, last, rows));
 
             if (state == State.WAITING && id == lastEnqueued + 1) {
@@ -588,6 +603,8 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
             if (maxInFlightCount / 2 >= currentInFlightCount) {
                 int countOfBatches = maxInFlightCount - currentInFlightCount;
 
+                onBatchesRequested(countOfBatches);
+
                 lastRequested += countOfBatches;
 
                 batchRequester.request(countOfBatches, sharedStateHolder);
@@ -654,5 +671,52 @@ public class Inbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, Si
 
             lastEnqueued = curr.batchId;
         }
+
+        void dumpMetrics(IgniteStringBuilder writer, String indent, String 
nodeName) {
+            writer.app(indent).app("Remote node: nodeName=").app(nodeName)
+                    .app(", receivedRows=").app(rowsReceived)
+                    .app(", batchesRequested=").app(batchesRequested);
+
+            if (batchesRequested > 0) {
+                writer.app(", 
requestTime=[agv=").app(MetricsAwareNode.beautifyNanoTime(requestTotalTime / 
batchesRequested))
+                        .app(", 
min=").app(MetricsAwareNode.beautifyNanoTime(requestMinTime))
+                        .app(", 
max=").app(MetricsAwareNode.beautifyNanoTime(requestMaxTime)).app("]");
+            }
+        }
+
+        private void onBatchesRequested(int countOfBatches) {
+            long timestamp = System.nanoTime();
+            batchesRequested += countOfBatches;
+            for (int i = 1; i <= countOfBatches; i++) {
+                batchTimestamps.put(lastRequested + i, timestamp);
+            }
+        }
+
+        private void batchReceived(int id, int rows) {
+            this.rowsReceived += rows;
+
+            long time = System.nanoTime() - batchTimestamps.remove(id);
+            this.requestTotalTime += time;
+            this.requestMinTime = Math.min(time, requestMinTime);
+            this.requestMaxTime = Math.max(time, requestMaxTime);
+        }
+    }
+
+    @Override
+    @TestOnly
+    public void dumpNodeMetrics(IgniteStringBuilder writer, String indent) {
+        // Calculate aggregated statistics.
+        onRowsReceived(perNodeBuffers.values().stream().mapToLong(n -> 
n.rowsReceived).sum());
+
+        writer.app(indent);
+        dumpMetrics0(writer);
+        writer.app(", fragmentId=").app(srcFragmentId);
+        writer.nl();
+
+        String childIndent = Debuggable.childIndentation(indent);
+        for (String nodeName : srcNodeNames) {
+            perNodeBuffers.get(nodeName).dumpMetrics(writer, childIndent, 
nodeName);
+            writer.nl();
+        }
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
index 38d4097b5e3..cd423dcab75 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
@@ -27,6 +27,7 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.lang.IgniteStringBuilder;
 import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.PartitionProvider;
@@ -157,4 +158,10 @@ public class IndexScanNode<RowT> extends 
StorageScanNode<RowT> {
                 throw new AssertionError("Unexpected index type: " + 
schemaIndex.type());
         }
     }
+
+    @Override
+    protected void dumpMetrics0(IgniteStringBuilder writer) {
+        super.dumpMetrics0(writer);
+        writer.app(", indexName=").app(schemaIndex.name());
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java
index 6ba21783076..4d4f26c4626 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
 import java.util.Comparator;
+import java.util.List;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.calcite.rel.RelCollation;
@@ -81,6 +82,8 @@ public class IndexSpoolNode<RowT> extends AbstractNode<RowT> 
implements SingleNo
     /** {@inheritDoc} */
     @Override
     public void rewind() {
+        onRewind();
+
         rewindInternal();
     }
 
@@ -100,6 +103,8 @@ public class IndexSpoolNode<RowT> extends 
AbstractNode<RowT> implements SingleNo
         assert !nullOrEmpty(sources()) && sources().size() == 1;
         assert rowsCnt > 0;
 
+        onRequestReceived();
+
         if (!indexReady()) {
             requested = rowsCnt;
 
@@ -112,6 +117,8 @@ public class IndexSpoolNode<RowT> extends 
AbstractNode<RowT> implements SingleNo
     /** {@inheritDoc} */
     @Override
     public void push(RowT row) throws Exception {
+        onRowReceived();
+
         idx.push(row);
 
         waiting--;
@@ -205,4 +212,13 @@ public class IndexSpoolNode<RowT> extends 
AbstractNode<RowT> implements SingleNo
 
         return new IndexSpoolNode<>(ctx, idx, scan);
     }
+
+    @Override
+    public void dumpNodeMetrics(IgniteStringBuilder writer, String indent) {
+        writer.app(indent);
+        dumpMetrics0(writer);
+        writer.nl();
+
+        MetricsAwareNode.dumpChildNodesMetrics(writer, indent, List.of(scan));
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java
index 48f00e9026b..ed7757c4b8f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java
@@ -43,6 +43,9 @@ public class LimitNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<Ro
     /** Upper requested rows. */
     private int requested;
 
+    // Metrics
+    private long filteredRows;
+
     /**
      * Constructor.
      *
@@ -66,6 +69,8 @@ public class LimitNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<Ro
         assert !nullOrEmpty(sources()) && sources().size() == 1;
         assert rowsCnt > 0;
 
+        onRequestReceived();
+
         if (!hasMoreData()) {
             end();
 
@@ -91,18 +96,21 @@ public class LimitNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<Ro
     /** {@inheritDoc} */
     @Override
     public void push(RowT row) throws Exception {
+        onRowReceived();
+
         if (waiting == NOT_WAITING) {
+            onRowFiltered();
             return;
         }
 
         --waiting;
 
-        if (rowsProcessed >= offset) {
-            if (hasMoreData()) {
-                // this two rows can`t be swapped, cause if all requested rows 
have been pushed it will trigger further request call.
-                --requested;
-                downstream().push(row);
-            }
+        if (rowsProcessed >= offset && hasMoreData()) {
+            // this two rows can`t be swapped, cause if all requested rows 
have been pushed it will trigger further request call.
+            --requested;
+            downstream().push(row);
+        } else {
+            onRowFiltered();
         }
 
         ++rowsProcessed;
@@ -120,6 +128,10 @@ public class LimitNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<Ro
         }
     }
 
+    private long onRowFiltered() {
+        return filteredRows++;
+    }
+
     /** {@inheritDoc} */
     @Override
     public void end() throws Exception {
@@ -162,6 +174,12 @@ public class LimitNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<Ro
                 .app(", rowsProcessed=").app(rowsProcessed);
     }
 
+    @Override
+    protected void dumpMetrics0(IgniteStringBuilder writer) {
+        super.dumpMetrics0(writer);
+        writer.app(", filteredRows=").app(filteredRows);
+    }
+
     /** {@code True} if fetch is undefined, or current rows processed is less 
than required. */
     private boolean hasMoreData() {
         return fetchUndefined || rowsProcessed < fetch + offset;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java
index 3e0c40c9b00..ada91778f69 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java
@@ -53,6 +53,10 @@ public abstract class MergeJoinNode<RowT> extends 
AbstractNode<RowT> {
 
     protected boolean inLoop;
 
+    // Metrics
+    private long receivedRowsFromLeft = 0L;
+    private long receiveRowsFromRight = 0L;
+
     /**
      * Creates MergeJoinNode.
      *
@@ -71,6 +75,8 @@ public abstract class MergeJoinNode<RowT> extends 
AbstractNode<RowT> {
         assert !nullOrEmpty(sources()) && sources().size() == 2;
         assert rowsCnt > 0 && requested == 0;
 
+        onRequestReceived();
+
         requested = rowsCnt;
 
         if (!inLoop) {
@@ -149,6 +155,8 @@ public abstract class MergeJoinNode<RowT> extends 
AbstractNode<RowT> {
         assert downstream() != null;
         assert waitingLeft > 0;
 
+        onRowReceivedFromLeft();
+
         waitingLeft--;
 
         leftInBuf.add(row);
@@ -162,6 +170,8 @@ public abstract class MergeJoinNode<RowT> extends 
AbstractNode<RowT> {
         assert downstream() != null;
         assert waitingRight > 0;
 
+        onRowReceivedFromRight();
+
         waitingRight--;
 
         rightInBuf.add(row);
@@ -1260,4 +1270,22 @@ public abstract class MergeJoinNode<RowT> extends 
AbstractNode<RowT> {
             }
         }
     }
+
+    @Override
+    protected void dumpMetrics0(IgniteStringBuilder writer) {
+        // Calculate aggregated statistics.
+        onRowsReceived(receivedRowsFromLeft + receiveRowsFromRight);
+
+        super.dumpMetrics0(writer);
+        writer.app(", leftRows=").app(receivedRowsFromLeft)
+                .app(", rightRows=").app(receiveRowsFromRight);
+    }
+
+    private void onRowReceivedFromLeft() {
+        receivedRowsFromLeft++;
+    }
+
+    private void onRowReceivedFromRight() {
+        receiveRowsFromRight++;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MetricsAwareNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MetricsAwareNode.java
new file mode 100644
index 00000000000..34cde41e12d
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MetricsAwareNode.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.lang.Debuggable;
+import org.apache.ignite.internal.lang.IgniteStringBuilder;
+
+/**
+ * Interface provides method for dumping query metrics.
+ */
+interface MetricsAwareNode {
+    /** Helper method to dump child nodes metrics. */
+    static void dumpChildNodesMetrics(IgniteStringBuilder writer, String 
indent, Iterable<? extends MetricsAwareNode> nodes) {
+        if (nodes != null) {
+            String childIndent = Debuggable.childIndentation(indent);
+            nodes.forEach(s -> s.dumpNodeMetrics(writer, childIndent));
+        }
+    }
+
+    /** Helper method for converting time in nanos to a string. */
+    static String beautifyNanoTime(long nanos) {
+        return TimeUnit.NANOSECONDS.toMillis(nanos) + "ms";
+    }
+
+    /**
+     * Dump current node metrics to a given writer.
+     * Note: node should bother writing provided indentation at each new line.
+     */
+    void dumpNodeMetrics(IgniteStringBuilder writer, String indent);
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
index 1a0412ffea0..54b3af227ab 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
@@ -157,6 +157,8 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
         assert !nullOrEmpty(sources()) && sources().size() == 1;
         assert rowsCnt > 0 && requested == 0;
 
+        onRequestReceived();
+
         requested = rowsCnt;
 
         requestNextBatchIfNeeded();
@@ -168,6 +170,8 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
         assert downstream() != null;
         assert waiting > 0;
 
+        onRowReceived();
+
         waiting--;
 
         rows.add(row);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Node.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Node.java
index 23934da2f05..a10cb19b364 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Node.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Node.java
@@ -28,7 +28,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
  * <b>Note</b>: except several cases (like consumer node and mailboxes), 
{@link Node#request(int)}, {@link Node#close()},
  * {@link Downstream#push(Object)} and {@link Downstream#end()} methods should 
be used from one single thread.
  */
-public interface Node<RowT> extends AutoCloseable, Debuggable {
+public interface Node<RowT> extends AutoCloseable, Debuggable, 
MetricsAwareNode {
     /**
      * Returns runtime context allowing access to the tables in a database.
      *
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
index 1e143cb973e..3411c035cf7 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
@@ -159,6 +159,8 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
     public void push(RowT row) throws Exception {
         assert waiting > 0 : waiting;
 
+        onRowReceived();
+
         waiting--;
 
         if (currentNode == null || dest.targets(row).contains(currentNode)) {
@@ -176,6 +178,10 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
         waiting = NOT_WAITING;
 
         flush();
+
+        if (ExecutionContext.DUMP_METRICS) {
+            dumpFragmentMetrics();
+        }
     }
 
     /** {@inheritDoc} */
@@ -189,6 +195,10 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
     /** {@inheritDoc} */
     @Override
     public void closeInternal() {
+        if (waiting != NOT_WAITING && ExecutionContext.DUMP_METRICS) {
+            dumpFragmentMetrics();
+        }
+
         super.closeInternal();
 
         registry.unregister(this);
@@ -496,6 +506,11 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
         private @Nullable List<RowT> curr;
         private int pendingCount;
 
+        // Metrics
+        long batchesRequested = 0L;
+        long rowsReceived = 0L;
+        long rowsSent = 0L;
+
         private RemoteDownstream(String nodeName, BatchSender<RowT> sender) {
             this.nodeName = nodeName;
             this.sender = sender;
@@ -519,6 +534,8 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
         void onBatchRequested(int amountOfBatches) throws Exception {
             assert amountOfBatches > 0 : amountOfBatches;
 
+            batchesRequested += amountOfBatches;
+
             this.pendingCount += amountOfBatches;
 
             // if there is a batch which is ready to be sent, then just sent it
@@ -541,6 +558,8 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
             assert ready() : state;
             assert curr != null;
 
+            rowsReceived++;
+
             curr.add(row);
 
             if (curr.size() == IO_BATCH_SIZE) {
@@ -560,6 +579,8 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
 
             boolean lastBatch = state == State.LAST_BATCH;
 
+            rowsSent += curr.size();
+
             sender.send(nodeName, ++lastSentBatchId, lastBatch, curr);
 
             pendingCount--;
@@ -612,4 +633,37 @@ public class Outbox<RowT> extends AbstractNode<RowT> 
implements Mailbox<RowT>, S
             this.amountOfBatches = amountOfBatches;
         }
     }
+
+    private void dumpFragmentMetrics() {
+        IgniteStringBuilder sb = new IgniteStringBuilder("Dump metrics for 
executed query:")
+                .app(" queryId=").app(context().queryId())
+                .app(", fragmentId=").app(context().fragmentId())
+                .app(" nodeName=").app(context().localNode().name())
+                .nl();
+
+        dumpNodeMetrics(sb, "");
+
+        LOG.info(sb.toString());
+    }
+
+    @Override
+    public void dumpNodeMetrics(IgniteStringBuilder writer, String indent) {
+        writer.app(indent);
+        dumpMetrics0(writer);
+        writer.nl();
+
+        String childIndent = Debuggable.childIndentation("");
+        String childIndent2 = Debuggable.childIndentation(childIndent);
+
+        for (Entry<String, RemoteDownstream<RowT>> entry : 
this.nodeBuffers.entrySet()) {
+            RemoteDownstream<RowT> downstream = entry.getValue();
+            writer.app(childIndent2)
+                    .app("Node: nodeName=").app(entry.getKey())
+                    .app(", rowsSent=").app(downstream.rowsSent)
+                    .app(", 
batchesRequested=").app(downstream.batchesRequested);
+            writer.nl();
+        }
+
+        MetricsAwareNode.dumpChildNodesMetrics(writer, childIndent, sources());
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ProjectNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ProjectNode.java
index 376c96ae4ad..ea731e973e1 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ProjectNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ProjectNode.java
@@ -55,6 +55,8 @@ public class ProjectNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<
         assert !nullOrEmpty(sources()) && sources().size() == 1;
         assert rowsCnt > 0;
 
+        onRequestReceived();
+
         source().request(rowsCnt);
     }
 
@@ -63,6 +65,8 @@ public class ProjectNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<
     public void push(RowT row) throws Exception {
         assert downstream() != null;
 
+        onRowReceived();
+
         downstream().push(prj.apply(row));
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java
index fe17d26c01f..b5f0e438f51 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java
@@ -141,6 +141,8 @@ public class RootNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<Row
         try {
             assert waiting > 0;
 
+            onRowReceived();
+
             waiting--;
 
             inBuff.offer(row);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java
index 36632a0935b..7291b452e02 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java
@@ -66,6 +66,8 @@ public class ScanNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<Row
     public void request(int rowsCnt) throws Exception {
         assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", 
requested=" + requested;
 
+        onRequestReceived();
+
         requested = rowsCnt;
 
         if (!inLoop) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java
index 6ea047b2d08..1c46c80f8cb 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java
@@ -105,6 +105,8 @@ public class SortAggregateNode<RowT> extends 
AbstractNode<RowT> implements Singl
         assert !nullOrEmpty(sources()) && sources().size() == 1;
         assert rowsCnt > 0 && requested == 0;
 
+        onRequestReceived();
+
         requested = rowsCnt;
 
         if (waiting == 0) {
@@ -120,6 +122,8 @@ public class SortAggregateNode<RowT> extends 
AbstractNode<RowT> implements Singl
         assert downstream() != null;
         assert waiting > 0;
 
+        onRowReceived();
+
         waiting--;
 
         if (grp != null) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortNode.java
index e9b9a6bed36..5900270c3cd 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortNode.java
@@ -126,6 +126,8 @@ public class SortNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<Row
         assert rowsCnt > 0 && requested == 0;
         assert waiting <= 0;
 
+        onRequestReceived();
+
         if (fetch == 0) {
             downstream().end();
 
@@ -148,6 +150,8 @@ public class SortNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<Row
         assert waiting > 0;
         assert reversed == null || reversed.isEmpty();
 
+        onRowReceived();
+
         waiting--;
 
         rows.add(row);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java
index ae248273261..a131d31907b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java
@@ -36,7 +36,7 @@ import org.jetbrains.annotations.Nullable;
 public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> {
     private Queue<RowT> inBuff = new LinkedBlockingQueue<>(inBufSize);
 
-    private final @Nullable Predicate<RowT> filters;
+    private final Predicate<RowT> filters;
 
     private final @Nullable Function<RowT, RowT> rowTransformer;
 
@@ -51,6 +51,11 @@ public abstract class StorageScanNode<RowT> extends 
AbstractNode<RowT> {
     /** Flag that indicate scan method was called already. */
     private boolean dataRequested;
 
+    // Metrics
+    private long filteredRows = 0L;
+    private long scanStartTime = -1L;
+    private long scanTime = 0L;
+
     /**
      * Constructor.
      *
@@ -76,6 +81,8 @@ public abstract class StorageScanNode<RowT> extends 
AbstractNode<RowT> {
     public void request(int rowsCnt) throws Exception {
         assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", 
requested=" + requested;
 
+        onRequestReceived();
+
         requested = rowsCnt;
 
         if (!inLoop) {
@@ -136,6 +143,7 @@ public abstract class StorageScanNode<RowT> extends 
AbstractNode<RowT> {
                     RowT row = inBuff.poll();
 
                     if (filters != null && !filters.test(row)) {
+                        onRowFiltered();
                         continue;
                     }
 
@@ -180,6 +188,8 @@ public abstract class StorageScanNode<RowT> extends 
AbstractNode<RowT> {
             waiting = inBufSize - inBuff.size();
         }
 
+        onScanStarted();
+
         Subscription subscription = this.activeSubscription;
         if (subscription != null) {
             subscription.request(waiting);
@@ -233,8 +243,12 @@ public abstract class StorageScanNode<RowT> extends 
AbstractNode<RowT> {
             // It is safe not to be aware about already closed execution flow.
             inBuffInner.add(row);
 
-            if (inBuffInner.size() == inBufSize) {
+            int size = inBuffInner.size();
+            if (size == inBufSize) {
                 StorageScanNode.this.execute(() -> {
+                    onRowsReceived(size);
+                    onScanFinished();
+
                     waiting = 0;
                     push();
                 });
@@ -253,6 +267,9 @@ public abstract class StorageScanNode<RowT> extends 
AbstractNode<RowT> {
         @Override
         public void onComplete() {
             StorageScanNode.this.execute(() -> {
+                onRowsReceived(inBuff.size());
+                onScanFinished();
+
                 activeSubscription = null;
                 waiting = 0;
 
@@ -260,4 +277,38 @@ public abstract class StorageScanNode<RowT> extends 
AbstractNode<RowT> {
             });
         }
     }
+
+    @Override
+    protected void dumpMetrics0(IgniteStringBuilder writer) {
+        writer.app(this.getClass().getSimpleName()).app(": ");
+
+        writer.app("scannedRows=").app(receivedRowsCount);
+
+        if (requestCount > 0) {
+            writer.app(", requests=").app(requestCount);
+        }
+
+        if (rewindCount > 0) {
+            writer.app(", rewinds=").app(rewindCount);
+        }
+
+        if (filteredRows > 0) {
+            writer.app(", filteredRows=").app(filteredRows);
+        }
+
+        writer.app(", 
scanTime=").app(MetricsAwareNode.beautifyNanoTime(scanTime));
+    }
+
+    private void onRowFiltered() {
+        filteredRows++;
+    }
+
+    private void onScanStarted() {
+        scanStartTime = System.nanoTime();
+    }
+
+    private void onScanFinished() {
+        scanTime += System.nanoTime() - scanStartTime;
+        scanStartTime = -1L;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
index 25e6500ef79..afc559a5331 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
@@ -23,11 +23,13 @@ import java.util.concurrent.Flow.Publisher;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.lang.IgniteStringBuilder;
 import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.PartitionProvider;
 import 
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
 import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.util.SubscriptionUtils;
 import org.apache.ignite.internal.util.TransformingIterator;
 import org.jetbrains.annotations.Nullable;
@@ -47,11 +49,14 @@ public class TableScanNode<RowT> extends 
StorageScanNode<RowT> {
 
     private final int @Nullable [] requiredColumns;
 
+    private final String tableName;
+
     /**
      * Constructor.
      *
      * @param ctx Execution context.
      * @param rowFactory Row factory.
+     * @param schemaTable Schema table.
      * @param table Internal table.
      * @param partitionProvider List of pairs containing the partition number 
to scan with the corresponding enlistment
      *         consistency token.
@@ -62,6 +67,7 @@ public class TableScanNode<RowT> extends 
StorageScanNode<RowT> {
     public TableScanNode(
             ExecutionContext<RowT> ctx,
             RowFactory<RowT> rowFactory,
+            IgniteTable schemaTable,
             ScannableTable table,
             PartitionProvider<RowT> partitionProvider,
             @Nullable Predicate<RowT> filters,
@@ -74,6 +80,7 @@ public class TableScanNode<RowT> extends 
StorageScanNode<RowT> {
         this.partitionProvider = partitionProvider;
         this.rowFactory = rowFactory;
         this.requiredColumns = requiredColumns == null ? null : 
requiredColumns.toIntArray();
+        this.tableName = schemaTable.name();
     }
 
     /** {@inheritDoc} */
@@ -86,4 +93,10 @@ public class TableScanNode<RowT> extends 
StorageScanNode<RowT> {
 
         return SubscriptionUtils.concat(it);
     }
+
+    @Override
+    protected void dumpMetrics0(IgniteStringBuilder writer) {
+        super.dumpMetrics0(writer);
+        writer.app(", tableName=").app(tableName);
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolNode.java
index ec45a488875..ce793912c54 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolNode.java
@@ -77,6 +77,8 @@ public class TableSpoolNode<RowT> extends AbstractNode<RowT> 
implements SingleNo
     /** {@inheritDoc} */
     @Override
     public void rewind() {
+        onRewind();
+
         rewindInternal();
     }
 
@@ -142,6 +144,8 @@ public class TableSpoolNode<RowT> extends 
AbstractNode<RowT> implements SingleNo
         assert downstream() != null;
         assert waiting > 0;
 
+        onRowReceived();
+
         waiting--;
 
         rows.add(row);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/UnionAllNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/UnionAllNode.java
index 1fbfe213bd7..d41e7bd074c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/UnionAllNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/UnionAllNode.java
@@ -56,6 +56,8 @@ public class UnionAllNode<RowT> extends AbstractNode<RowT> 
implements Downstream
         assert !nullOrEmpty(sources());
         assert rowsCnt > 0 && waiting == 0;
 
+        onRequestReceived();
+
         source().request(waiting = rowsCnt);
     }
 
@@ -65,6 +67,8 @@ public class UnionAllNode<RowT> extends AbstractNode<RowT> 
implements Downstream
         assert downstream() != null;
         assert waiting > 0;
 
+        onRowReceived();
+
         waiting--;
 
         downstream().push(row);
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java
index 45109a745b6..d4e1adc4d5a 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java
@@ -80,7 +80,7 @@ class AsyncRootNodeTest extends 
AbstractExecutionTest<RowWrapper> {
                 null
         );
 
-        var rootNode = new AsyncRootNode<>(dataSourceScanNode, 
Function.identity());
+        var rootNode = new AsyncRootNode<>(context, dataSourceScanNode, 
Function.identity());
 
         rootNode.requestNextAsync(1);
 
@@ -123,7 +123,7 @@ class AsyncRootNodeTest extends 
AbstractExecutionTest<RowWrapper> {
                 null
         );
 
-        AsyncRootNode<RowWrapper, RowWrapper> rootNode = new 
AsyncRootNode<>(dataSourceScanNode, Function.identity());
+        AsyncRootNode<RowWrapper, RowWrapper> rootNode = new 
AsyncRootNode<>(context, dataSourceScanNode, Function.identity());
         dataSourceScanNode.onRegister(rootNode);
 
         // trigger prefetch
@@ -171,7 +171,7 @@ class AsyncRootNodeTest extends 
AbstractExecutionTest<RowWrapper> {
             return IntStream.range(0, 76).mapToObj(factory::create).iterator();
         });
 
-        AsyncRootNode<RowWrapper, RowWrapper> rootNode = new 
AsyncRootNode<>(scanNode, Function.identity());
+        AsyncRootNode<RowWrapper, RowWrapper> rootNode = new 
AsyncRootNode<>(context, scanNode, Function.identity());
         scanNode.onRegister(rootNode);
 
         // trigger prefetch
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
index 403422e9d72..614981419ba 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
@@ -531,7 +531,7 @@ public class ExchangeExecutionTest extends 
AbstractExecutionTest<Object[]> {
         }
 
         RewindableAsyncRoot<Object[], Object[]> root = new 
RewindableAsyncRoot<>(
-                node, Function.identity()
+                targetCtx, node, Function.identity()
         );
 
         node.onRegister(root);
@@ -665,8 +665,8 @@ public class ExchangeExecutionTest extends 
AbstractExecutionTest<Object[]> {
          * @param source A source to requests rows from.
          * @param converter A converter to convert rows from an internal 
format to desired output format.
          */
-        RewindableAsyncRoot(AbstractNode<InT> source, Function<InT, OutT> 
converter) {
-            super(source, converter);
+        RewindableAsyncRoot(ExecutionContext<InT> ctx, AbstractNode<InT> 
source, Function<InT, OutT> converter) {
+            super(ctx, source, converter);
         }
 
         @Override
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
index 8394a75b869..fb61d4e6ebe 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
@@ -427,6 +427,11 @@ public class ExecutionTest extends 
AbstractExecutionTest<Object[]> {
         public void dumpState(IgniteStringBuilder writer, String indent) {
             
writer.app(indent).app("class=").app(getClass().getSimpleName()).nl();
         }
+
+        @Override
+        public void dumpNodeMetrics(IgniteStringBuilder writer, String indent) 
{
+            writer.app(indent).app(getClass().getSimpleName()).nl();
+        }
     }
 
     @Override
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 5db917e662c..7c211a55ff7 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -79,6 +79,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.TableRowConverter;
 import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
 import org.apache.ignite.internal.sql.engine.framework.DataProvider;
 import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
@@ -228,7 +229,8 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
             };
             ScannableTableImpl scanableTable = new 
ScannableTableImpl(internalTable, rf -> rowConverter);
             PartitionProvider<Object[]> partitionProvider = 
PartitionProvider.fromPartitions(partsWithConsistencyTokens);
-            TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx, 
rowFactory, scanableTable,
+            IgniteTable schemaTable = mock(IgniteTable.class);
+            TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx, 
rowFactory, schemaTable, scanableTable,
                     partitionProvider, null, null, null);
 
             RootNode<Object[]> root = new RootNode<>(ctx);
@@ -282,7 +284,9 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
         RowFactory<Object[]> rowFactory = 
ctx.rowFactoryFactory().create(schema);
 
         ScannableTable scannableTable = 
TestBuilders.tableScan(DataProvider.fromRow(new Object[]{42}, partDataSize));
-        TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx, 
rowFactory, scannableTable, c -> partitions, null, null, null);
+        IgniteTable schemaTable = mock(IgniteTable.class);
+        TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx, 
rowFactory, schemaTable, scannableTable,
+                c -> partitions, null, null, null);
         RootNode<Object[]> rootNode = new RootNode<>(ctx);
 
         rootNode.register(scanNode);

Reply via email to