This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f074f3abef8 IGNITE-27981 SQL. Introduce query execution metrics for
debug purposes (#7663)
f074f3abef8 is described below
commit f074f3abef88d678941e861d17827c2a60ee72fd
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Mon Mar 2 12:39:27 2026 +0300
IGNITE-27981 SQL. Introduce query execution metrics for debug purposes
(#7663)
---
.../internal/sql/engine/exec/ExecutionContext.java | 3 +
.../sql/engine/exec/ExecutionServiceImpl.java | 1 +
.../sql/engine/exec/LogicalRelImplementor.java | 1 +
.../internal/sql/engine/exec/rel/AbstractNode.java | 46 ++++++++++++++
.../rel/AbstractRightMaterializedJoinNode.java | 28 +++++++++
.../sql/engine/exec/rel/AbstractSetOpNode.java | 4 ++
.../sql/engine/exec/rel/AsyncRootNode.java | 70 +++++++++++++++++++++-
.../exec/rel/CorrelatedNestedLoopJoinNode.java | 28 +++++++++
.../internal/sql/engine/exec/rel/FilterNode.java | 19 ++++++
.../sql/engine/exec/rel/HashAggregateNode.java | 4 ++
.../ignite/internal/sql/engine/exec/rel/Inbox.java | 64 ++++++++++++++++++++
.../sql/engine/exec/rel/IndexScanNode.java | 7 +++
.../sql/engine/exec/rel/IndexSpoolNode.java | 16 +++++
.../internal/sql/engine/exec/rel/LimitNode.java | 30 ++++++++--
.../sql/engine/exec/rel/MergeJoinNode.java | 28 +++++++++
.../sql/engine/exec/rel/MetricsAwareNode.java | 46 ++++++++++++++
.../internal/sql/engine/exec/rel/ModifyNode.java | 4 ++
.../ignite/internal/sql/engine/exec/rel/Node.java | 2 +-
.../internal/sql/engine/exec/rel/Outbox.java | 54 +++++++++++++++++
.../internal/sql/engine/exec/rel/ProjectNode.java | 4 ++
.../internal/sql/engine/exec/rel/RootNode.java | 2 +
.../internal/sql/engine/exec/rel/ScanNode.java | 2 +
.../sql/engine/exec/rel/SortAggregateNode.java | 4 ++
.../internal/sql/engine/exec/rel/SortNode.java | 4 ++
.../sql/engine/exec/rel/StorageScanNode.java | 55 ++++++++++++++++-
.../sql/engine/exec/rel/TableScanNode.java | 13 ++++
.../sql/engine/exec/rel/TableSpoolNode.java | 4 ++
.../internal/sql/engine/exec/rel/UnionAllNode.java | 4 ++
.../sql/engine/exec/rel/AsyncRootNodeTest.java | 6 +-
.../sql/engine/exec/rel/ExchangeExecutionTest.java | 6 +-
.../sql/engine/exec/rel/ExecutionTest.java | 5 ++
.../exec/rel/TableScanNodeExecutionTest.java | 8 ++-
32 files changed, 554 insertions(+), 18 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index d66109f15a9..ef569080ed6 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -36,6 +36,7 @@ import org.apache.calcite.linq4j.QueryProvider;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.lang.RunnableX;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -64,6 +65,8 @@ import org.jetbrains.annotations.Nullable;
public class ExecutionContext<RowT> implements SqlEvaluationContext<RowT> {
private static final IgniteLogger LOG =
Loggers.forClass(ExecutionContext.class);
+ public static final boolean DUMP_METRICS =
IgniteSystemProperties.getBoolean("IGNITE_DUMP_QUERY_METRICS_TO_LOGS", false);
+
/**
* TODO: https://issues.apache.org/jira/browse/IGNITE-15276 Support other
locales.
*/
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index ce1e20f4a9c..35fbc350053 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -1054,6 +1054,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, LogicalTopo
);
AsyncRootNode<RowT, InternalSqlRow> rootNode = new
AsyncRootNode<>(
+ ectx,
node,
inRow -> new InternalSqlRowImpl<>(inRow,
ectx.rowAccessor(), internalTypeConverter));
node.onRegister(rootNode);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 525f55e7f9b..4a6305b0cce 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -575,6 +575,7 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
return new TableScanNode<>(
ctx,
rowFactory,
+ tbl,
scannableTable,
partitionProvider,
filters,
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
index e810e1c20dd..9c4b535fe10 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
@@ -62,6 +62,11 @@ public abstract class AbstractNode<RowT> implements
Node<RowT> {
private List<Node<RowT>> sources;
+ // Metrics
+ protected int requestCount = 0;
+ protected int rewindCount = 0;
+ protected long receivedRowsCount = 0L;
+
/**
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -112,6 +117,8 @@ public abstract class AbstractNode<RowT> implements
Node<RowT> {
/** {@inheritDoc} */
@Override
public void rewind() {
+ onRewind();
+
rewindInternal();
if (!nullOrEmpty(sources())) {
@@ -219,4 +226,43 @@ public abstract class AbstractNode<RowT> implements
Node<RowT> {
Debuggable.dumpState(writer, Debuggable.childIndentation(indent),
sources);
}
}
+
+ @Override
+ public void dumpNodeMetrics(IgniteStringBuilder writer, String indent) {
+ writer.app(indent);
+ dumpMetrics0(writer);
+ writer.nl();
+
+ MetricsAwareNode.dumpChildNodesMetrics(writer, indent, sources);
+ }
+
+ protected void dumpMetrics0(IgniteStringBuilder writer) {
+ writer.app(this.getClass().getSimpleName()).app(": ");
+
+ writer.app("receivedRows=").app(receivedRowsCount);
+
+ if (requestCount > 0) {
+ writer.app(", requests=").app(requestCount);
+ }
+
+ if (rewindCount > 0) {
+ writer.app(", rewinds=").app(rewindCount);
+ }
+ }
+
+ protected final void onRequestReceived() {
+ requestCount++;
+ }
+
+ protected final void onRowReceived() {
+ receivedRowsCount++;
+ }
+
+ protected final void onRowsReceived(long rowsCount) {
+ receivedRowsCount += rowsCount;
+ }
+
+ protected final void onRewind() {
+ rewindCount++;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java
index 1e81985add5..0a729417df6 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java
@@ -34,6 +34,10 @@ public abstract class
AbstractRightMaterializedJoinNode<RowT> extends AbstractNo
final Deque<RowT> leftInBuf = new ArrayDeque<>(inBufSize);
protected @Nullable RowT left;
+ // Metrics
+ private long receivedRowsFromLeft = 0L;
+ private long receiveRowsFromRight = 0L;
+
AbstractRightMaterializedJoinNode(ExecutionContext<RowT> ctx) {
super(ctx);
}
@@ -44,6 +48,8 @@ public abstract class AbstractRightMaterializedJoinNode<RowT>
extends AbstractNo
assert !nullOrEmpty(sources()) && sources().size() == 2;
assert rowsCnt > 0 && requested == 0;
+ onRequestReceived();
+
requested = rowsCnt;
if (!inLoop) {
@@ -70,6 +76,8 @@ public abstract class AbstractRightMaterializedJoinNode<RowT>
extends AbstractNo
/** {@inheritDoc} */
@Override
public void push(RowT row) throws Exception {
+ onRowReceivedFromLeft();
+
pushLeft(row);
}
@@ -90,6 +98,8 @@ public abstract class AbstractRightMaterializedJoinNode<RowT>
extends AbstractNo
/** {@inheritDoc} */
@Override
public void push(RowT row) throws Exception {
+ onRowReceivedFromRight();
+
pushRight(row);
}
@@ -158,4 +168,22 @@ public abstract class
AbstractRightMaterializedJoinNode<RowT> extends AbstractNo
protected abstract void join() throws Exception;
protected abstract void pushRight(RowT row) throws Exception;
+
+ @Override
+ protected void dumpMetrics0(IgniteStringBuilder writer) {
+ // Calculate aggregated statistics.
+ onRowsReceived(receivedRowsFromLeft + receiveRowsFromRight);
+
+ super.dumpMetrics0(writer);
+ writer.app(", leftRows=").app(receivedRowsFromLeft)
+ .app(", rightRows=").app(receiveRowsFromRight);
+ }
+
+ private void onRowReceivedFromLeft() {
+ receivedRowsFromLeft++;
+ }
+
+ private void onRowReceivedFromRight() {
+ receiveRowsFromRight++;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java
index c436f4fd999..79ea3281783 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java
@@ -65,6 +65,8 @@ public abstract class AbstractSetOpNode<RowT> extends
AbstractNode<RowT> {
assert rowsCnt > 0 && requested == 0;
assert waiting <= 0;
+ onRequestReceived();
+
requested = rowsCnt;
if (waiting == 0) {
@@ -132,6 +134,8 @@ public abstract class AbstractSetOpNode<RowT> extends
AbstractNode<RowT> {
return new Downstream<>() {
@Override
public void push(RowT row) throws Exception {
+ onRowReceived();
+
AbstractSetOpNode.this.push(row, idx);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
index dcc33bdd1ba..23a5048996d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
@@ -24,12 +24,18 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import org.apache.ignite.internal.lang.Debuggable;
+import org.apache.ignite.internal.lang.IgniteStringBuilder;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.lang.CursorClosedException;
import org.jetbrains.annotations.Nullable;
@@ -38,6 +44,7 @@ import org.jetbrains.annotations.Nullable;
* An async iterator over the execution tree.
*/
public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>,
AsyncCursor<OutRowT> {
+ public static final IgniteLogger LOGGER =
Loggers.forClass(AsyncRootNode.class);
private final CompletableFuture<Void> cancelFut = new
CompletableFuture<>();
/**
@@ -69,15 +76,25 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
*/
private int waiting;
+ // Metrics
+ private long rowsReceived;
+ private long queryStartTime = -1L;
+ private long prefetchTime = -1L;
+ private long queryTime = -1L;
+ private final long fragmentId;
+ private final UUID queryId;
+
/**
* Constructor.
*
* @param source A source to requests rows from.
* @param converter A converter to convert rows from an internal format to
desired output format.
*/
- public AsyncRootNode(AbstractNode<InRowT> source, Function<InRowT,
OutRowT> converter) {
+ public AsyncRootNode(ExecutionContext<InRowT> ctx, AbstractNode<InRowT>
source, Function<InRowT, OutRowT> converter) {
this.source = source;
this.converter = converter;
+ queryId = ctx.queryId();
+ fragmentId = ctx.description().fragmentId();
}
/** {@inheritDoc} */
@@ -85,6 +102,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
public void push(InRowT row) throws Exception {
assert waiting > 0 : waiting;
+ onRowReceived();
+
buff.add(converter.apply(row));
if (--waiting == 0) {
@@ -104,6 +123,10 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
completePrefetchFuture(null);
flush();
+
+ if (ExecutionContext.DUMP_METRICS) {
+ dumpQueryMetrics();
+ }
}
/** {@inheritDoc} */
@@ -199,6 +222,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
public CompletableFuture<Void> startPrefetch() {
assert source.context().description().prefetch();
+ onQueryStarted();
+
if (waiting == 0) {
try {
source.checkState();
@@ -259,6 +284,7 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
} else if (waiting == NOT_WAITING) {
assert hasMore == HasMore.NO : hasMore;
+ onQueryFinish();
closeAsync();
}
} else if (!pendingRequests.isEmpty()) {
@@ -272,6 +298,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
private void scheduleTask() {
if (!pendingRequests.isEmpty() && taskScheduled.compareAndSet(false,
true)) {
source.execute(() -> {
+ onQueryStarted();
+
taskScheduled.set(false);
flush();
@@ -286,6 +314,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
*/
private void completePrefetchFuture(@Nullable Throwable ex) {
if (!prefetchFut.isDone()) {
+ onPrefetchFinished();
+
if (ex != null) {
prefetchFut.completeExceptionally(ex);
} else {
@@ -320,4 +350,42 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
private enum HasMore {
YES, NO, UNCERTAIN
}
+
+ private void dumpQueryMetrics() {
+ IgniteStringBuilder sb = new IgniteStringBuilder();
+ sb.app("Dump metrics for executed query:
queryId=").app(queryId).app(", fragmentId=").app(fragmentId).nl();
+ sb.app("RootNode: rows=").app(rowsReceived)
+ .app(",
prefetch=").app(MetricsAwareNode.beautifyNanoTime(prefetchTime))
+ .app(",
totalTime=").app(MetricsAwareNode.beautifyNanoTime((queryTime)))
+ .nl();
+
+ MetricsAwareNode.dumpChildNodesMetrics(sb,
Debuggable.childIndentation(""), List.of(source));
+
+ LOGGER.info(sb.toString());
+ }
+
+ private void onRowReceived() {
+ rowsReceived++;
+ }
+
+ private void onQueryStarted() {
+ if (queryStartTime == -1) {
+ queryStartTime = System.nanoTime();
+ }
+ }
+
+ private void onPrefetchFinished() {
+ if (prefetchTime == -1) {
+ prefetchTime = System.nanoTime() - queryStartTime;
+ }
+ }
+
+ private void onQueryFinish() {
+ if (queryStartTime != -1) {
+ onPrefetchFinished();
+
+ queryTime += System.nanoTime() - queryStartTime;
+ queryStartTime = -1;
+ }
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
index 0d269190deb..96cc7d0246c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
@@ -78,6 +78,10 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends
AbstractNode<RowT> {
INITIAL, FILLING_LEFT, FILLING_RIGHT, IDLE, IN_LOOP, END
}
+ // Metrics
+ private long receivedRowsFromLeft = 0L;
+ private long receiveRowsFromRight = 0L;
+
/**
* Creates CorrelatedNestedLoopJoin node.
*
@@ -121,6 +125,8 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends
AbstractNode<RowT> {
assert !nullOrEmpty(sources()) && sources().size() == 2;
assert rowsCnt > 0 && requested == 0;
+ onRequestReceived();
+
requested = rowsCnt;
onRequest();
@@ -203,6 +209,8 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends
AbstractNode<RowT> {
assert downstream() != null;
assert waitingLeft > 0;
+ onRowReceivedFromLeft();
+
waitingLeft--;
if (leftInBuf == null) {
@@ -218,6 +226,8 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends
AbstractNode<RowT> {
assert downstream() != null;
assert waitingRight > 0;
+ onRowReceivedFromRight();
+
waitingRight--;
if (rightInBuf == null) {
@@ -503,4 +513,22 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends
AbstractNode<RowT> {
}
}
}
+
+ @Override
+ protected void dumpMetrics0(IgniteStringBuilder writer) {
+ // Calculate aggregated statistics.
+ onRowsReceived(receivedRowsFromLeft + receiveRowsFromRight);
+
+ super.dumpMetrics0(writer);
+ writer.app(", leftRows=").app(receivedRowsFromLeft)
+ .app(", rightRows=").app(receiveRowsFromRight);
+ }
+
+ private void onRowReceivedFromLeft() {
+ receivedRowsFromLeft++;
+ }
+
+ private void onRowReceivedFromRight() {
+ receiveRowsFromRight++;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/FilterNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/FilterNode.java
index ce26c1beaf3..21e0246fc89 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/FilterNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/FilterNode.java
@@ -40,6 +40,9 @@ public class FilterNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
private boolean inLoop;
+ // Metrics
+ private long filteredRows;
+
/**
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -59,6 +62,8 @@ public class FilterNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
assert !nullOrEmpty(sources()) && sources().size() == 1;
assert rowsCnt > 0 && requested == 0;
+ onRequestReceived();
+
requested = rowsCnt;
if (!inLoop) {
@@ -72,10 +77,14 @@ public class FilterNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
assert downstream() != null;
assert waiting > 0;
+ onRowReceived();
+
waiting--;
if (pred.test(row)) {
inBuf.add(row);
+ } else {
+ onRowFiltered();
}
filter();
@@ -147,4 +156,14 @@ public class FilterNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
.app(", requested=").app(requested)
.app(", waiting=").app(waiting);
}
+
+ @Override
+ protected void dumpMetrics0(IgniteStringBuilder writer) {
+ super.dumpMetrics0(writer);
+ writer.app(", filteredRows=").app(filteredRows);
+ }
+
+ private void onRowFiltered() {
+ filteredRows++;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
index 0373336600b..ac608616065 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
@@ -102,6 +102,8 @@ public class HashAggregateNode<RowT> extends
AbstractNode<RowT> implements Singl
assert rowsCnt > 0 && requested == 0;
assert waiting <= 0;
+ onRequestReceived();
+
requested = rowsCnt;
if (waiting == 0) {
@@ -117,6 +119,8 @@ public class HashAggregateNode<RowT> extends
AbstractNode<RowT> implements Singl
assert downstream() != null;
assert waiting > 0;
+ onRowReceived();
+
waiting--;
for (Grouping grouping : groupings) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
index 5c2534de4f1..ab6331cb2b6 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
import static org.apache.calcite.util.Util.unexpected;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import it.unimi.dsi.fastutil.ints.Int2LongArrayMap;
+import it.unimi.dsi.fastutil.ints.Int2LongMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
@@ -116,6 +118,8 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
public void request(int rowsCnt) throws Exception {
assert rowsCnt > 0 && requested == 0;
+ onRequestReceived();
+
requested = rowsCnt;
if (!inLoop) {
@@ -542,6 +546,14 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
*/
private @Nullable SharedState sharedStateHolder = null;
+ // Metrics
+ Int2LongMap batchTimestamps = new Int2LongArrayMap(IO_BATCH_CNT);
+ long rowsReceived = 0L;
+ long batchesRequested = 0L;
+ long requestTotalTime = 0L;
+ long requestMinTime = Long.MAX_VALUE;
+ long requestMaxTime = 0L;
+
private RemoteSource(BatchRequester batchRequester) {
this.batchRequester = batchRequester;
}
@@ -555,6 +567,7 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
void reset(SharedState state) {
sharedStateHolder = state;
batches.clear();
+ batchTimestamps.clear();
this.lastEnqueued = lastRequested;
this.state = State.WAITING;
@@ -570,6 +583,8 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
return;
}
+ batchReceived(id, rows.size());
+
batches.offer(new Batch<>(id, last, rows));
if (state == State.WAITING && id == lastEnqueued + 1) {
@@ -588,6 +603,8 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
if (maxInFlightCount / 2 >= currentInFlightCount) {
int countOfBatches = maxInFlightCount - currentInFlightCount;
+ onBatchesRequested(countOfBatches);
+
lastRequested += countOfBatches;
batchRequester.request(countOfBatches, sharedStateHolder);
@@ -654,5 +671,52 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
lastEnqueued = curr.batchId;
}
+
+ void dumpMetrics(IgniteStringBuilder writer, String indent, String
nodeName) {
+ writer.app(indent).app("Remote node: nodeName=").app(nodeName)
+ .app(", receivedRows=").app(rowsReceived)
+ .app(", batchesRequested=").app(batchesRequested);
+
+ if (batchesRequested > 0) {
+ writer.app(",
requestTime=[agv=").app(MetricsAwareNode.beautifyNanoTime(requestTotalTime /
batchesRequested))
+ .app(",
min=").app(MetricsAwareNode.beautifyNanoTime(requestMinTime))
+ .app(",
max=").app(MetricsAwareNode.beautifyNanoTime(requestMaxTime)).app("]");
+ }
+ }
+
+ private void onBatchesRequested(int countOfBatches) {
+ long timestamp = System.nanoTime();
+ batchesRequested += countOfBatches;
+ for (int i = 1; i <= countOfBatches; i++) {
+ batchTimestamps.put(lastRequested + i, timestamp);
+ }
+ }
+
+ private void batchReceived(int id, int rows) {
+ this.rowsReceived += rows;
+
+ long time = System.nanoTime() - batchTimestamps.remove(id);
+ this.requestTotalTime += time;
+ this.requestMinTime = Math.min(time, requestMinTime);
+ this.requestMaxTime = Math.max(time, requestMaxTime);
+ }
+ }
+
+ @Override
+ @TestOnly
+ public void dumpNodeMetrics(IgniteStringBuilder writer, String indent) {
+ // Calculate aggregated statistics.
+ onRowsReceived(perNodeBuffers.values().stream().mapToLong(n ->
n.rowsReceived).sum());
+
+ writer.app(indent);
+ dumpMetrics0(writer);
+ writer.app(", fragmentId=").app(srcFragmentId);
+ writer.nl();
+
+ String childIndent = Debuggable.childIndentation(indent);
+ for (String nodeName : srcNodeNames) {
+ perNodeBuffers.get(nodeName).dumpMetrics(writer, childIndent,
nodeName);
+ writer.nl();
+ }
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
index 38d4097b5e3..cd423dcab75 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java
@@ -27,6 +27,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.lang.IgniteStringBuilder;
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.PartitionProvider;
@@ -157,4 +158,10 @@ public class IndexScanNode<RowT> extends
StorageScanNode<RowT> {
throw new AssertionError("Unexpected index type: " +
schemaIndex.type());
}
}
+
+ @Override
+ protected void dumpMetrics0(IgniteStringBuilder writer) {
+ super.dumpMetrics0(writer);
+ writer.app(", indexName=").app(schemaIndex.name());
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java
index 6ba21783076..4d4f26c4626 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import java.util.Comparator;
+import java.util.List;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.calcite.rel.RelCollation;
@@ -81,6 +82,8 @@ public class IndexSpoolNode<RowT> extends AbstractNode<RowT>
implements SingleNo
/** {@inheritDoc} */
@Override
public void rewind() {
+ onRewind();
+
rewindInternal();
}
@@ -100,6 +103,8 @@ public class IndexSpoolNode<RowT> extends
AbstractNode<RowT> implements SingleNo
assert !nullOrEmpty(sources()) && sources().size() == 1;
assert rowsCnt > 0;
+ onRequestReceived();
+
if (!indexReady()) {
requested = rowsCnt;
@@ -112,6 +117,8 @@ public class IndexSpoolNode<RowT> extends
AbstractNode<RowT> implements SingleNo
/** {@inheritDoc} */
@Override
public void push(RowT row) throws Exception {
+ onRowReceived();
+
idx.push(row);
waiting--;
@@ -205,4 +212,13 @@ public class IndexSpoolNode<RowT> extends
AbstractNode<RowT> implements SingleNo
return new IndexSpoolNode<>(ctx, idx, scan);
}
+
+ @Override
+ public void dumpNodeMetrics(IgniteStringBuilder writer, String indent) {
+ writer.app(indent);
+ dumpMetrics0(writer);
+ writer.nl();
+
+ MetricsAwareNode.dumpChildNodesMetrics(writer, indent, List.of(scan));
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java
index 48f00e9026b..ed7757c4b8f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java
@@ -43,6 +43,9 @@ public class LimitNode<RowT> extends AbstractNode<RowT>
implements SingleNode<Ro
/** Upper requested rows. */
private int requested;
+ // Metrics
+ private long filteredRows;
+
/**
* Constructor.
*
@@ -66,6 +69,8 @@ public class LimitNode<RowT> extends AbstractNode<RowT>
implements SingleNode<Ro
assert !nullOrEmpty(sources()) && sources().size() == 1;
assert rowsCnt > 0;
+ onRequestReceived();
+
if (!hasMoreData()) {
end();
@@ -91,18 +96,21 @@ public class LimitNode<RowT> extends AbstractNode<RowT>
implements SingleNode<Ro
/** {@inheritDoc} */
@Override
public void push(RowT row) throws Exception {
+ onRowReceived();
+
if (waiting == NOT_WAITING) {
+ onRowFiltered();
return;
}
--waiting;
- if (rowsProcessed >= offset) {
- if (hasMoreData()) {
- // this two rows can`t be swapped, cause if all requested rows
have been pushed it will trigger further request call.
- --requested;
- downstream().push(row);
- }
+ if (rowsProcessed >= offset && hasMoreData()) {
+ // this two rows can`t be swapped, cause if all requested rows
have been pushed it will trigger further request call.
+ --requested;
+ downstream().push(row);
+ } else {
+ onRowFiltered();
}
++rowsProcessed;
@@ -120,6 +128,10 @@ public class LimitNode<RowT> extends AbstractNode<RowT>
implements SingleNode<Ro
}
}
+ private long onRowFiltered() {
+ return filteredRows++;
+ }
+
/** {@inheritDoc} */
@Override
public void end() throws Exception {
@@ -162,6 +174,12 @@ public class LimitNode<RowT> extends AbstractNode<RowT>
implements SingleNode<Ro
.app(", rowsProcessed=").app(rowsProcessed);
}
+ @Override
+ protected void dumpMetrics0(IgniteStringBuilder writer) {
+ super.dumpMetrics0(writer);
+ writer.app(", filteredRows=").app(filteredRows);
+ }
+
/** {@code True} if fetch is undefined, or current rows processed is less
than required. */
private boolean hasMoreData() {
return fetchUndefined || rowsProcessed < fetch + offset;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java
index 3e0c40c9b00..ada91778f69 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java
@@ -53,6 +53,10 @@ public abstract class MergeJoinNode<RowT> extends
AbstractNode<RowT> {
protected boolean inLoop;
+ // Metrics
+ private long receivedRowsFromLeft = 0L;
+ private long receiveRowsFromRight = 0L;
+
/**
* Creates MergeJoinNode.
*
@@ -71,6 +75,8 @@ public abstract class MergeJoinNode<RowT> extends
AbstractNode<RowT> {
assert !nullOrEmpty(sources()) && sources().size() == 2;
assert rowsCnt > 0 && requested == 0;
+ onRequestReceived();
+
requested = rowsCnt;
if (!inLoop) {
@@ -149,6 +155,8 @@ public abstract class MergeJoinNode<RowT> extends
AbstractNode<RowT> {
assert downstream() != null;
assert waitingLeft > 0;
+ onRowReceivedFromLeft();
+
waitingLeft--;
leftInBuf.add(row);
@@ -162,6 +170,8 @@ public abstract class MergeJoinNode<RowT> extends
AbstractNode<RowT> {
assert downstream() != null;
assert waitingRight > 0;
+ onRowReceivedFromRight();
+
waitingRight--;
rightInBuf.add(row);
@@ -1260,4 +1270,22 @@ public abstract class MergeJoinNode<RowT> extends
AbstractNode<RowT> {
}
}
}
+
+ @Override
+ protected void dumpMetrics0(IgniteStringBuilder writer) {
+ // Calculate aggregated statistics.
+ onRowsReceived(receivedRowsFromLeft + receiveRowsFromRight);
+
+ super.dumpMetrics0(writer);
+ writer.app(", leftRows=").app(receivedRowsFromLeft)
+ .app(", rightRows=").app(receiveRowsFromRight);
+ }
+
+ private void onRowReceivedFromLeft() {
+ receivedRowsFromLeft++;
+ }
+
+ private void onRowReceivedFromRight() {
+ receiveRowsFromRight++;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MetricsAwareNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MetricsAwareNode.java
new file mode 100644
index 00000000000..34cde41e12d
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MetricsAwareNode.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.lang.Debuggable;
+import org.apache.ignite.internal.lang.IgniteStringBuilder;
+
+/**
+ * Interface provides method for dumping query metrics.
+ */
+interface MetricsAwareNode {
+ /** Helper method to dump child nodes metrics. */
+ static void dumpChildNodesMetrics(IgniteStringBuilder writer, String
indent, Iterable<? extends MetricsAwareNode> nodes) {
+ if (nodes != null) {
+ String childIndent = Debuggable.childIndentation(indent);
+ nodes.forEach(s -> s.dumpNodeMetrics(writer, childIndent));
+ }
+ }
+
+ /** Helper method for converting time in nanos to a string. */
+ static String beautifyNanoTime(long nanos) {
+ return TimeUnit.NANOSECONDS.toMillis(nanos) + "ms";
+ }
+
+ /**
+ * Dump current node metrics to a given writer.
+ * Note: node should bother writing provided indentation at each new line.
+ */
+ void dumpNodeMetrics(IgniteStringBuilder writer, String indent);
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
index 1a0412ffea0..54b3af227ab 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
@@ -157,6 +157,8 @@ public class ModifyNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
assert !nullOrEmpty(sources()) && sources().size() == 1;
assert rowsCnt > 0 && requested == 0;
+ onRequestReceived();
+
requested = rowsCnt;
requestNextBatchIfNeeded();
@@ -168,6 +170,8 @@ public class ModifyNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
assert downstream() != null;
assert waiting > 0;
+ onRowReceived();
+
waiting--;
rows.add(row);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Node.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Node.java
index 23934da2f05..a10cb19b364 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Node.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Node.java
@@ -28,7 +28,7 @@ import
org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
* <b>Note</b>: except several cases (like consumer node and mailboxes),
{@link Node#request(int)}, {@link Node#close()},
* {@link Downstream#push(Object)} and {@link Downstream#end()} methods should
be used from one single thread.
*/
-public interface Node<RowT> extends AutoCloseable, Debuggable {
+public interface Node<RowT> extends AutoCloseable, Debuggable,
MetricsAwareNode {
/**
* Returns runtime context allowing access to the tables in a database.
*
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
index 1e143cb973e..3411c035cf7 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
@@ -159,6 +159,8 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
public void push(RowT row) throws Exception {
assert waiting > 0 : waiting;
+ onRowReceived();
+
waiting--;
if (currentNode == null || dest.targets(row).contains(currentNode)) {
@@ -176,6 +178,10 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
waiting = NOT_WAITING;
flush();
+
+ if (ExecutionContext.DUMP_METRICS) {
+ dumpFragmentMetrics();
+ }
}
/** {@inheritDoc} */
@@ -189,6 +195,10 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
/** {@inheritDoc} */
@Override
public void closeInternal() {
+ if (waiting != NOT_WAITING && ExecutionContext.DUMP_METRICS) {
+ dumpFragmentMetrics();
+ }
+
super.closeInternal();
registry.unregister(this);
@@ -496,6 +506,11 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
private @Nullable List<RowT> curr;
private int pendingCount;
+ // Metrics
+ long batchesRequested = 0L;
+ long rowsReceived = 0L;
+ long rowsSent = 0L;
+
private RemoteDownstream(String nodeName, BatchSender<RowT> sender) {
this.nodeName = nodeName;
this.sender = sender;
@@ -519,6 +534,8 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
void onBatchRequested(int amountOfBatches) throws Exception {
assert amountOfBatches > 0 : amountOfBatches;
+ batchesRequested += amountOfBatches;
+
this.pendingCount += amountOfBatches;
// if there is a batch which is ready to be sent, then just sent it
@@ -541,6 +558,8 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
assert ready() : state;
assert curr != null;
+ rowsReceived++;
+
curr.add(row);
if (curr.size() == IO_BATCH_SIZE) {
@@ -560,6 +579,8 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
boolean lastBatch = state == State.LAST_BATCH;
+ rowsSent += curr.size();
+
sender.send(nodeName, ++lastSentBatchId, lastBatch, curr);
pendingCount--;
@@ -612,4 +633,37 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
this.amountOfBatches = amountOfBatches;
}
}
+
+ private void dumpFragmentMetrics() {
+ IgniteStringBuilder sb = new IgniteStringBuilder("Dump metrics for
executed query:")
+ .app(" queryId=").app(context().queryId())
+ .app(", fragmentId=").app(context().fragmentId())
+ .app(" nodeName=").app(context().localNode().name())
+ .nl();
+
+ dumpNodeMetrics(sb, "");
+
+ LOG.info(sb.toString());
+ }
+
+ @Override
+ public void dumpNodeMetrics(IgniteStringBuilder writer, String indent) {
+ writer.app(indent);
+ dumpMetrics0(writer);
+ writer.nl();
+
+ String childIndent = Debuggable.childIndentation("");
+ String childIndent2 = Debuggable.childIndentation(childIndent);
+
+ for (Entry<String, RemoteDownstream<RowT>> entry :
this.nodeBuffers.entrySet()) {
+ RemoteDownstream<RowT> downstream = entry.getValue();
+ writer.app(childIndent2)
+ .app("Node: nodeName=").app(entry.getKey())
+ .app(", rowsSent=").app(downstream.rowsSent)
+ .app(",
batchesRequested=").app(downstream.batchesRequested);
+ writer.nl();
+ }
+
+ MetricsAwareNode.dumpChildNodesMetrics(writer, childIndent, sources());
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ProjectNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ProjectNode.java
index 376c96ae4ad..ea731e973e1 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ProjectNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ProjectNode.java
@@ -55,6 +55,8 @@ public class ProjectNode<RowT> extends AbstractNode<RowT>
implements SingleNode<
assert !nullOrEmpty(sources()) && sources().size() == 1;
assert rowsCnt > 0;
+ onRequestReceived();
+
source().request(rowsCnt);
}
@@ -63,6 +65,8 @@ public class ProjectNode<RowT> extends AbstractNode<RowT>
implements SingleNode<
public void push(RowT row) throws Exception {
assert downstream() != null;
+ onRowReceived();
+
downstream().push(prj.apply(row));
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java
index fe17d26c01f..b5f0e438f51 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java
@@ -141,6 +141,8 @@ public class RootNode<RowT> extends AbstractNode<RowT>
implements SingleNode<Row
try {
assert waiting > 0;
+ onRowReceived();
+
waiting--;
inBuff.offer(row);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java
index 36632a0935b..7291b452e02 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java
@@ -66,6 +66,8 @@ public class ScanNode<RowT> extends AbstractNode<RowT>
implements SingleNode<Row
public void request(int rowsCnt) throws Exception {
assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ",
requested=" + requested;
+ onRequestReceived();
+
requested = rowsCnt;
if (!inLoop) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java
index 6ea047b2d08..1c46c80f8cb 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java
@@ -105,6 +105,8 @@ public class SortAggregateNode<RowT> extends
AbstractNode<RowT> implements Singl
assert !nullOrEmpty(sources()) && sources().size() == 1;
assert rowsCnt > 0 && requested == 0;
+ onRequestReceived();
+
requested = rowsCnt;
if (waiting == 0) {
@@ -120,6 +122,8 @@ public class SortAggregateNode<RowT> extends
AbstractNode<RowT> implements Singl
assert downstream() != null;
assert waiting > 0;
+ onRowReceived();
+
waiting--;
if (grp != null) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortNode.java
index e9b9a6bed36..5900270c3cd 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortNode.java
@@ -126,6 +126,8 @@ public class SortNode<RowT> extends AbstractNode<RowT>
implements SingleNode<Row
assert rowsCnt > 0 && requested == 0;
assert waiting <= 0;
+ onRequestReceived();
+
if (fetch == 0) {
downstream().end();
@@ -148,6 +150,8 @@ public class SortNode<RowT> extends AbstractNode<RowT>
implements SingleNode<Row
assert waiting > 0;
assert reversed == null || reversed.isEmpty();
+ onRowReceived();
+
waiting--;
rows.add(row);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java
index ae248273261..a131d31907b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java
@@ -36,7 +36,7 @@ import org.jetbrains.annotations.Nullable;
public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> {
private Queue<RowT> inBuff = new LinkedBlockingQueue<>(inBufSize);
- private final @Nullable Predicate<RowT> filters;
+ private final Predicate<RowT> filters;
private final @Nullable Function<RowT, RowT> rowTransformer;
@@ -51,6 +51,11 @@ public abstract class StorageScanNode<RowT> extends
AbstractNode<RowT> {
/** Flag that indicate scan method was called already. */
private boolean dataRequested;
+ // Metrics
+ private long filteredRows = 0L;
+ private long scanStartTime = -1L;
+ private long scanTime = 0L;
+
/**
* Constructor.
*
@@ -76,6 +81,8 @@ public abstract class StorageScanNode<RowT> extends
AbstractNode<RowT> {
public void request(int rowsCnt) throws Exception {
assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ",
requested=" + requested;
+ onRequestReceived();
+
requested = rowsCnt;
if (!inLoop) {
@@ -136,6 +143,7 @@ public abstract class StorageScanNode<RowT> extends
AbstractNode<RowT> {
RowT row = inBuff.poll();
if (filters != null && !filters.test(row)) {
+ onRowFiltered();
continue;
}
@@ -180,6 +188,8 @@ public abstract class StorageScanNode<RowT> extends
AbstractNode<RowT> {
waiting = inBufSize - inBuff.size();
}
+ onScanStarted();
+
Subscription subscription = this.activeSubscription;
if (subscription != null) {
subscription.request(waiting);
@@ -233,8 +243,12 @@ public abstract class StorageScanNode<RowT> extends
AbstractNode<RowT> {
// It is safe not to be aware about already closed execution flow.
inBuffInner.add(row);
- if (inBuffInner.size() == inBufSize) {
+ int size = inBuffInner.size();
+ if (size == inBufSize) {
StorageScanNode.this.execute(() -> {
+ onRowsReceived(size);
+ onScanFinished();
+
waiting = 0;
push();
});
@@ -253,6 +267,9 @@ public abstract class StorageScanNode<RowT> extends
AbstractNode<RowT> {
@Override
public void onComplete() {
StorageScanNode.this.execute(() -> {
+ onRowsReceived(inBuff.size());
+ onScanFinished();
+
activeSubscription = null;
waiting = 0;
@@ -260,4 +277,38 @@ public abstract class StorageScanNode<RowT> extends
AbstractNode<RowT> {
});
}
}
+
+ @Override
+ protected void dumpMetrics0(IgniteStringBuilder writer) {
+ writer.app(this.getClass().getSimpleName()).app(": ");
+
+ writer.app("scannedRows=").app(receivedRowsCount);
+
+ if (requestCount > 0) {
+ writer.app(", requests=").app(requestCount);
+ }
+
+ if (rewindCount > 0) {
+ writer.app(", rewinds=").app(rewindCount);
+ }
+
+ if (filteredRows > 0) {
+ writer.app(", filteredRows=").app(filteredRows);
+ }
+
+ writer.app(",
scanTime=").app(MetricsAwareNode.beautifyNanoTime(scanTime));
+ }
+
+ private void onRowFiltered() {
+ filteredRows++;
+ }
+
+ private void onScanStarted() {
+ scanStartTime = System.nanoTime();
+ }
+
+ private void onScanFinished() {
+ scanTime += System.nanoTime() - scanStartTime;
+ scanStartTime = -1L;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
index 25e6500ef79..afc559a5331 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java
@@ -23,11 +23,13 @@ import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.lang.IgniteStringBuilder;
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.PartitionProvider;
import
org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken;
import org.apache.ignite.internal.sql.engine.exec.ScannableTable;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.util.SubscriptionUtils;
import org.apache.ignite.internal.util.TransformingIterator;
import org.jetbrains.annotations.Nullable;
@@ -47,11 +49,14 @@ public class TableScanNode<RowT> extends
StorageScanNode<RowT> {
private final int @Nullable [] requiredColumns;
+ private final String tableName;
+
/**
* Constructor.
*
* @param ctx Execution context.
* @param rowFactory Row factory.
+ * @param schemaTable Schema table.
* @param table Internal table.
* @param partitionProvider List of pairs containing the partition number
to scan with the corresponding enlistment
* consistency token.
@@ -62,6 +67,7 @@ public class TableScanNode<RowT> extends
StorageScanNode<RowT> {
public TableScanNode(
ExecutionContext<RowT> ctx,
RowFactory<RowT> rowFactory,
+ IgniteTable schemaTable,
ScannableTable table,
PartitionProvider<RowT> partitionProvider,
@Nullable Predicate<RowT> filters,
@@ -74,6 +80,7 @@ public class TableScanNode<RowT> extends
StorageScanNode<RowT> {
this.partitionProvider = partitionProvider;
this.rowFactory = rowFactory;
this.requiredColumns = requiredColumns == null ? null :
requiredColumns.toIntArray();
+ this.tableName = schemaTable.name();
}
/** {@inheritDoc} */
@@ -86,4 +93,10 @@ public class TableScanNode<RowT> extends
StorageScanNode<RowT> {
return SubscriptionUtils.concat(it);
}
+
+ @Override
+ protected void dumpMetrics0(IgniteStringBuilder writer) {
+ super.dumpMetrics0(writer);
+ writer.app(", tableName=").app(tableName);
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolNode.java
index ec45a488875..ce793912c54 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolNode.java
@@ -77,6 +77,8 @@ public class TableSpoolNode<RowT> extends AbstractNode<RowT>
implements SingleNo
/** {@inheritDoc} */
@Override
public void rewind() {
+ onRewind();
+
rewindInternal();
}
@@ -142,6 +144,8 @@ public class TableSpoolNode<RowT> extends
AbstractNode<RowT> implements SingleNo
assert downstream() != null;
assert waiting > 0;
+ onRowReceived();
+
waiting--;
rows.add(row);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/UnionAllNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/UnionAllNode.java
index 1fbfe213bd7..d41e7bd074c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/UnionAllNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/UnionAllNode.java
@@ -56,6 +56,8 @@ public class UnionAllNode<RowT> extends AbstractNode<RowT>
implements Downstream
assert !nullOrEmpty(sources());
assert rowsCnt > 0 && waiting == 0;
+ onRequestReceived();
+
source().request(waiting = rowsCnt);
}
@@ -65,6 +67,8 @@ public class UnionAllNode<RowT> extends AbstractNode<RowT>
implements Downstream
assert downstream() != null;
assert waiting > 0;
+ onRowReceived();
+
waiting--;
downstream().push(row);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java
index 45109a745b6..d4e1adc4d5a 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java
@@ -80,7 +80,7 @@ class AsyncRootNodeTest extends
AbstractExecutionTest<RowWrapper> {
null
);
- var rootNode = new AsyncRootNode<>(dataSourceScanNode,
Function.identity());
+ var rootNode = new AsyncRootNode<>(context, dataSourceScanNode,
Function.identity());
rootNode.requestNextAsync(1);
@@ -123,7 +123,7 @@ class AsyncRootNodeTest extends
AbstractExecutionTest<RowWrapper> {
null
);
- AsyncRootNode<RowWrapper, RowWrapper> rootNode = new
AsyncRootNode<>(dataSourceScanNode, Function.identity());
+ AsyncRootNode<RowWrapper, RowWrapper> rootNode = new
AsyncRootNode<>(context, dataSourceScanNode, Function.identity());
dataSourceScanNode.onRegister(rootNode);
// trigger prefetch
@@ -171,7 +171,7 @@ class AsyncRootNodeTest extends
AbstractExecutionTest<RowWrapper> {
return IntStream.range(0, 76).mapToObj(factory::create).iterator();
});
- AsyncRootNode<RowWrapper, RowWrapper> rootNode = new
AsyncRootNode<>(scanNode, Function.identity());
+ AsyncRootNode<RowWrapper, RowWrapper> rootNode = new
AsyncRootNode<>(context, scanNode, Function.identity());
scanNode.onRegister(rootNode);
// trigger prefetch
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
index 403422e9d72..614981419ba 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
@@ -531,7 +531,7 @@ public class ExchangeExecutionTest extends
AbstractExecutionTest<Object[]> {
}
RewindableAsyncRoot<Object[], Object[]> root = new
RewindableAsyncRoot<>(
- node, Function.identity()
+ targetCtx, node, Function.identity()
);
node.onRegister(root);
@@ -665,8 +665,8 @@ public class ExchangeExecutionTest extends
AbstractExecutionTest<Object[]> {
* @param source A source to requests rows from.
* @param converter A converter to convert rows from an internal
format to desired output format.
*/
- RewindableAsyncRoot(AbstractNode<InT> source, Function<InT, OutT>
converter) {
- super(source, converter);
+ RewindableAsyncRoot(ExecutionContext<InT> ctx, AbstractNode<InT>
source, Function<InT, OutT> converter) {
+ super(ctx, source, converter);
}
@Override
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
index 8394a75b869..fb61d4e6ebe 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java
@@ -427,6 +427,11 @@ public class ExecutionTest extends
AbstractExecutionTest<Object[]> {
public void dumpState(IgniteStringBuilder writer, String indent) {
writer.app(indent).app("class=").app(getClass().getSimpleName()).nl();
}
+
+ @Override
+ public void dumpNodeMetrics(IgniteStringBuilder writer, String indent)
{
+ writer.app(indent).app(getClass().getSimpleName()).nl();
+ }
}
@Override
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 8f682a761f9..fdc914c291b 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -79,6 +79,7 @@ import
org.apache.ignite.internal.sql.engine.exec.TableRowConverter;
import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.framework.DataProvider;
import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
@@ -228,7 +229,8 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
};
ScannableTableImpl scanableTable = new
ScannableTableImpl(internalTable, rf -> rowConverter);
PartitionProvider<Object[]> partitionProvider =
PartitionProvider.fromPartitions(partsWithConsistencyTokens);
- TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx,
rowFactory, scanableTable,
+ IgniteTable schemaTable = mock(IgniteTable.class);
+ TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx,
rowFactory, schemaTable, scanableTable,
partitionProvider, null, null, null);
RootNode<Object[]> root = new RootNode<>(ctx);
@@ -282,7 +284,9 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
RowFactory<Object[]> rowFactory =
ctx.rowFactoryFactory().create(schema);
ScannableTable scannableTable =
TestBuilders.tableScan(DataProvider.fromRow(new Object[]{42}, partDataSize));
- TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx,
rowFactory, scannableTable, c -> partitions, null, null, null);
+ IgniteTable schemaTable = mock(IgniteTable.class);
+ TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx,
rowFactory, schemaTable, scannableTable,
+ c -> partitions, null, null, null);
RootNode<Object[]> rootNode = new RootNode<>(ctx);
rootNode.register(scanNode);