This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-27981 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit a370bf6e63f4594332b690f727649be014f0bf8e Author: AMashenkov <[email protected]> AuthorDate: Wed Feb 25 15:26:48 2026 +0300 wip --- .../internal/sql/engine/exec/ExecutionContext.java | 3 + .../sql/engine/exec/ExecutionServiceImpl.java | 1 + .../sql/engine/exec/LogicalRelImplementor.java | 1 + .../internal/sql/engine/exec/rel/AbstractNode.java | 46 ++++++++++++++ .../rel/AbstractRightMaterializedJoinNode.java | 28 +++++++++ .../sql/engine/exec/rel/AbstractSetOpNode.java | 4 ++ .../sql/engine/exec/rel/AsyncRootNode.java | 70 +++++++++++++++++++++- .../exec/rel/CorrelatedNestedLoopJoinNode.java | 28 +++++++++ .../internal/sql/engine/exec/rel/FilterNode.java | 19 ++++++ .../sql/engine/exec/rel/HashAggregateNode.java | 4 ++ .../ignite/internal/sql/engine/exec/rel/Inbox.java | 64 ++++++++++++++++++++ .../sql/engine/exec/rel/IndexScanNode.java | 7 +++ .../sql/engine/exec/rel/IndexSpoolNode.java | 16 +++++ .../internal/sql/engine/exec/rel/LimitNode.java | 30 ++++++++-- .../sql/engine/exec/rel/MergeJoinNode.java | 28 +++++++++ .../sql/engine/exec/rel/MetricsAwareNode.java | 46 ++++++++++++++ .../internal/sql/engine/exec/rel/ModifyNode.java | 4 ++ .../ignite/internal/sql/engine/exec/rel/Node.java | 2 +- .../internal/sql/engine/exec/rel/Outbox.java | 54 +++++++++++++++++ .../internal/sql/engine/exec/rel/ProjectNode.java | 4 ++ .../internal/sql/engine/exec/rel/RootNode.java | 2 + .../internal/sql/engine/exec/rel/ScanNode.java | 2 + .../sql/engine/exec/rel/SortAggregateNode.java | 4 ++ .../internal/sql/engine/exec/rel/SortNode.java | 4 ++ .../sql/engine/exec/rel/StorageScanNode.java | 55 ++++++++++++++++- .../sql/engine/exec/rel/TableScanNode.java | 13 ++++ .../sql/engine/exec/rel/TableSpoolNode.java | 4 ++ .../internal/sql/engine/exec/rel/UnionAllNode.java | 4 ++ .../sql/engine/exec/rel/AsyncRootNodeTest.java | 6 +- .../sql/engine/exec/rel/ExchangeExecutionTest.java | 6 +- .../sql/engine/exec/rel/ExecutionTest.java | 5 ++ .../exec/rel/TableScanNodeExecutionTest.java | 8 ++- 32 files changed, 554 insertions(+), 18 deletions(-) diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java index 5716c1fbaa4..bc6eaac3b3c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java @@ -37,6 +37,7 @@ import org.apache.calcite.linq4j.QueryProvider; import org.apache.calcite.schema.SchemaPlus; import org.apache.ignite.internal.lang.IgniteInternalCheckedException; import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.lang.IgniteSystemProperties; import org.apache.ignite.internal.lang.RunnableX; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; @@ -65,6 +66,8 @@ import org.jetbrains.annotations.Nullable; public class ExecutionContext<RowT> implements SqlEvaluationContext<RowT> { private static final IgniteLogger LOG = Loggers.forClass(ExecutionContext.class); + public static final boolean DUMP_METRICS = IgniteSystemProperties.getBoolean("IGNITE_DUMP_QUERY_METRICS_TO_LOGS", false); + /** * TODO: https://issues.apache.org/jira/browse/IGNITE-15276 Support other locales. */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java index ce1e20f4a9c..35fbc350053 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java @@ -1054,6 +1054,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService, LogicalTopo ); AsyncRootNode<RowT, InternalSqlRow> rootNode = new AsyncRootNode<>( + ectx, node, inRow -> new InternalSqlRowImpl<>(inRow, ectx.rowAccessor(), internalTypeConverter)); node.onRegister(rootNode); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java index dfd117016ca..3d53320e8f7 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java @@ -575,6 +575,7 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>> return new TableScanNode<>( ctx, rowFactory, + tbl, scannableTable, partitionProvider, filters, diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java index e810e1c20dd..9c4b535fe10 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java @@ -62,6 +62,11 @@ public abstract class AbstractNode<RowT> implements Node<RowT> { private List<Node<RowT>> sources; + // Metrics + protected int requestCount = 0; + protected int rewindCount = 0; + protected long receivedRowsCount = 0L; + /** * Constructor. * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 @@ -112,6 +117,8 @@ public abstract class AbstractNode<RowT> implements Node<RowT> { /** {@inheritDoc} */ @Override public void rewind() { + onRewind(); + rewindInternal(); if (!nullOrEmpty(sources())) { @@ -219,4 +226,43 @@ public abstract class AbstractNode<RowT> implements Node<RowT> { Debuggable.dumpState(writer, Debuggable.childIndentation(indent), sources); } } + + @Override + public void dumpNodeMetrics(IgniteStringBuilder writer, String indent) { + writer.app(indent); + dumpMetrics0(writer); + writer.nl(); + + MetricsAwareNode.dumpChildNodesMetrics(writer, indent, sources); + } + + protected void dumpMetrics0(IgniteStringBuilder writer) { + writer.app(this.getClass().getSimpleName()).app(": "); + + writer.app("receivedRows=").app(receivedRowsCount); + + if (requestCount > 0) { + writer.app(", requests=").app(requestCount); + } + + if (rewindCount > 0) { + writer.app(", rewinds=").app(rewindCount); + } + } + + protected final void onRequestReceived() { + requestCount++; + } + + protected final void onRowReceived() { + receivedRowsCount++; + } + + protected final void onRowsReceived(long rowsCount) { + receivedRowsCount += rowsCount; + } + + protected final void onRewind() { + rewindCount++; + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java index 1e81985add5..0a729417df6 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractRightMaterializedJoinNode.java @@ -34,6 +34,10 @@ public abstract class AbstractRightMaterializedJoinNode<RowT> extends AbstractNo final Deque<RowT> leftInBuf = new ArrayDeque<>(inBufSize); protected @Nullable RowT left; + // Metrics + private long receivedRowsFromLeft = 0L; + private long receiveRowsFromRight = 0L; + AbstractRightMaterializedJoinNode(ExecutionContext<RowT> ctx) { super(ctx); } @@ -44,6 +48,8 @@ public abstract class AbstractRightMaterializedJoinNode<RowT> extends AbstractNo assert !nullOrEmpty(sources()) && sources().size() == 2; assert rowsCnt > 0 && requested == 0; + onRequestReceived(); + requested = rowsCnt; if (!inLoop) { @@ -70,6 +76,8 @@ public abstract class AbstractRightMaterializedJoinNode<RowT> extends AbstractNo /** {@inheritDoc} */ @Override public void push(RowT row) throws Exception { + onRowReceivedFromLeft(); + pushLeft(row); } @@ -90,6 +98,8 @@ public abstract class AbstractRightMaterializedJoinNode<RowT> extends AbstractNo /** {@inheritDoc} */ @Override public void push(RowT row) throws Exception { + onRowReceivedFromRight(); + pushRight(row); } @@ -158,4 +168,22 @@ public abstract class AbstractRightMaterializedJoinNode<RowT> extends AbstractNo protected abstract void join() throws Exception; protected abstract void pushRight(RowT row) throws Exception; + + @Override + protected void dumpMetrics0(IgniteStringBuilder writer) { + // Calculate aggregated statistics. + onRowsReceived(receivedRowsFromLeft + receiveRowsFromRight); + + super.dumpMetrics0(writer); + writer.app(", leftRows=").app(receivedRowsFromLeft) + .app(", rightRows=").app(receiveRowsFromRight); + } + + private void onRowReceivedFromLeft() { + receivedRowsFromLeft++; + } + + private void onRowReceivedFromRight() { + receiveRowsFromRight++; + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java index c436f4fd999..79ea3281783 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractSetOpNode.java @@ -65,6 +65,8 @@ public abstract class AbstractSetOpNode<RowT> extends AbstractNode<RowT> { assert rowsCnt > 0 && requested == 0; assert waiting <= 0; + onRequestReceived(); + requested = rowsCnt; if (waiting == 0) { @@ -132,6 +134,8 @@ public abstract class AbstractSetOpNode<RowT> extends AbstractNode<RowT> { return new Downstream<>() { @Override public void push(RowT row) throws Exception { + onRowReceived(); + AbstractSetOpNode.this.push(row, idx); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java index dcc33bdd1ba..23a5048996d 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java @@ -24,12 +24,18 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import java.util.Queue; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import org.apache.ignite.internal.lang.Debuggable; +import org.apache.ignite.internal.lang.IgniteStringBuilder; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.sql.engine.QueryCancelledException; +import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.util.AsyncCursor; import org.apache.ignite.lang.CursorClosedException; import org.jetbrains.annotations.Nullable; @@ -38,6 +44,7 @@ import org.jetbrains.annotations.Nullable; * An async iterator over the execution tree. */ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, AsyncCursor<OutRowT> { + public static final IgniteLogger LOGGER = Loggers.forClass(AsyncRootNode.class); private final CompletableFuture<Void> cancelFut = new CompletableFuture<>(); /** @@ -69,15 +76,25 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async */ private int waiting; + // Metrics + private long rowsReceived; + private long queryStartTime = -1L; + private long prefetchTime = -1L; + private long queryTime = -1L; + private final long fragmentId; + private final UUID queryId; + /** * Constructor. * * @param source A source to requests rows from. * @param converter A converter to convert rows from an internal format to desired output format. */ - public AsyncRootNode(AbstractNode<InRowT> source, Function<InRowT, OutRowT> converter) { + public AsyncRootNode(ExecutionContext<InRowT> ctx, AbstractNode<InRowT> source, Function<InRowT, OutRowT> converter) { this.source = source; this.converter = converter; + queryId = ctx.queryId(); + fragmentId = ctx.description().fragmentId(); } /** {@inheritDoc} */ @@ -85,6 +102,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async public void push(InRowT row) throws Exception { assert waiting > 0 : waiting; + onRowReceived(); + buff.add(converter.apply(row)); if (--waiting == 0) { @@ -104,6 +123,10 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async completePrefetchFuture(null); flush(); + + if (ExecutionContext.DUMP_METRICS) { + dumpQueryMetrics(); + } } /** {@inheritDoc} */ @@ -199,6 +222,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async public CompletableFuture<Void> startPrefetch() { assert source.context().description().prefetch(); + onQueryStarted(); + if (waiting == 0) { try { source.checkState(); @@ -259,6 +284,7 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async } else if (waiting == NOT_WAITING) { assert hasMore == HasMore.NO : hasMore; + onQueryFinish(); closeAsync(); } } else if (!pendingRequests.isEmpty()) { @@ -272,6 +298,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async private void scheduleTask() { if (!pendingRequests.isEmpty() && taskScheduled.compareAndSet(false, true)) { source.execute(() -> { + onQueryStarted(); + taskScheduled.set(false); flush(); @@ -286,6 +314,8 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async */ private void completePrefetchFuture(@Nullable Throwable ex) { if (!prefetchFut.isDone()) { + onPrefetchFinished(); + if (ex != null) { prefetchFut.completeExceptionally(ex); } else { @@ -320,4 +350,42 @@ public class AsyncRootNode<InRowT, OutRowT> implements Downstream<InRowT>, Async private enum HasMore { YES, NO, UNCERTAIN } + + private void dumpQueryMetrics() { + IgniteStringBuilder sb = new IgniteStringBuilder(); + sb.app("Dump metrics for executed query: queryId=").app(queryId).app(", fragmentId=").app(fragmentId).nl(); + sb.app("RootNode: rows=").app(rowsReceived) + .app(", prefetch=").app(MetricsAwareNode.beautifyNanoTime(prefetchTime)) + .app(", totalTime=").app(MetricsAwareNode.beautifyNanoTime((queryTime))) + .nl(); + + MetricsAwareNode.dumpChildNodesMetrics(sb, Debuggable.childIndentation(""), List.of(source)); + + LOGGER.info(sb.toString()); + } + + private void onRowReceived() { + rowsReceived++; + } + + private void onQueryStarted() { + if (queryStartTime == -1) { + queryStartTime = System.nanoTime(); + } + } + + private void onPrefetchFinished() { + if (prefetchTime == -1) { + prefetchTime = System.nanoTime() - queryStartTime; + } + } + + private void onQueryFinish() { + if (queryStartTime != -1) { + onPrefetchFinished(); + + queryTime += System.nanoTime() - queryStartTime; + queryStartTime = -1; + } + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java index ecc5bbabe81..4fb0ac4d1f7 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java @@ -74,6 +74,10 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends AbstractNode<RowT> { INITIAL, FILLING_LEFT, FILLING_RIGHT, IDLE, IN_LOOP, END } + // Metrics + private long receivedRowsFromLeft = 0L; + private long receiveRowsFromRight = 0L; + /** * Creates CorrelatedNestedLoopJoin node. * @@ -114,6 +118,8 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends AbstractNode<RowT> { assert !nullOrEmpty(sources()) && sources().size() == 2; assert rowsCnt > 0 && requested == 0; + onRequestReceived(); + requested = rowsCnt; onRequest(); @@ -196,6 +202,8 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends AbstractNode<RowT> { assert downstream() != null; assert waitingLeft > 0; + onRowReceivedFromLeft(); + waitingLeft--; if (leftInBuf == null) { @@ -211,6 +219,8 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends AbstractNode<RowT> { assert downstream() != null; assert waitingRight > 0; + onRowReceivedFromRight(); + waitingRight--; if (rightInBuf == null) { @@ -488,4 +498,22 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends AbstractNode<RowT> { context().correlatedVariable(row, correlationIds.get(i).getId()); } } + + @Override + protected void dumpMetrics0(IgniteStringBuilder writer) { + // Calculate aggregated statistics. + onRowsReceived(receivedRowsFromLeft + receiveRowsFromRight); + + super.dumpMetrics0(writer); + writer.app(", leftRows=").app(receivedRowsFromLeft) + .app(", rightRows=").app(receiveRowsFromRight); + } + + private void onRowReceivedFromLeft() { + receivedRowsFromLeft++; + } + + private void onRowReceivedFromRight() { + receiveRowsFromRight++; + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/FilterNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/FilterNode.java index ce26c1beaf3..21e0246fc89 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/FilterNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/FilterNode.java @@ -40,6 +40,9 @@ public class FilterNode<RowT> extends AbstractNode<RowT> implements SingleNode<R private boolean inLoop; + // Metrics + private long filteredRows; + /** * Constructor. * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859 @@ -59,6 +62,8 @@ public class FilterNode<RowT> extends AbstractNode<RowT> implements SingleNode<R assert !nullOrEmpty(sources()) && sources().size() == 1; assert rowsCnt > 0 && requested == 0; + onRequestReceived(); + requested = rowsCnt; if (!inLoop) { @@ -72,10 +77,14 @@ public class FilterNode<RowT> extends AbstractNode<RowT> implements SingleNode<R assert downstream() != null; assert waiting > 0; + onRowReceived(); + waiting--; if (pred.test(row)) { inBuf.add(row); + } else { + onRowFiltered(); } filter(); @@ -147,4 +156,14 @@ public class FilterNode<RowT> extends AbstractNode<RowT> implements SingleNode<R .app(", requested=").app(requested) .app(", waiting=").app(waiting); } + + @Override + protected void dumpMetrics0(IgniteStringBuilder writer) { + super.dumpMetrics0(writer); + writer.app(", filteredRows=").app(filteredRows); + } + + private void onRowFiltered() { + filteredRows++; + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java index 0373336600b..ac608616065 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java @@ -102,6 +102,8 @@ public class HashAggregateNode<RowT> extends AbstractNode<RowT> implements Singl assert rowsCnt > 0 && requested == 0; assert waiting <= 0; + onRequestReceived(); + requested = rowsCnt; if (waiting == 0) { @@ -117,6 +119,8 @@ public class HashAggregateNode<RowT> extends AbstractNode<RowT> implements Singl assert downstream() != null; assert waiting > 0; + onRowReceived(); + waiting--; for (Grouping grouping : groupings) { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java index 5c2534de4f1..ab6331cb2b6 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java @@ -20,6 +20,8 @@ package org.apache.ignite.internal.sql.engine.exec.rel; import static org.apache.calcite.util.Util.unexpected; import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; +import it.unimi.dsi.fastutil.ints.Int2LongArrayMap; +import it.unimi.dsi.fastutil.ints.Int2LongMap; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -116,6 +118,8 @@ public class Inbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, Si public void request(int rowsCnt) throws Exception { assert rowsCnt > 0 && requested == 0; + onRequestReceived(); + requested = rowsCnt; if (!inLoop) { @@ -542,6 +546,14 @@ public class Inbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, Si */ private @Nullable SharedState sharedStateHolder = null; + // Metrics + Int2LongMap batchTimestamps = new Int2LongArrayMap(IO_BATCH_CNT); + long rowsReceived = 0L; + long batchesRequested = 0L; + long requestTotalTime = 0L; + long requestMinTime = Long.MAX_VALUE; + long requestMaxTime = 0L; + private RemoteSource(BatchRequester batchRequester) { this.batchRequester = batchRequester; } @@ -555,6 +567,7 @@ public class Inbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, Si void reset(SharedState state) { sharedStateHolder = state; batches.clear(); + batchTimestamps.clear(); this.lastEnqueued = lastRequested; this.state = State.WAITING; @@ -570,6 +583,8 @@ public class Inbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, Si return; } + batchReceived(id, rows.size()); + batches.offer(new Batch<>(id, last, rows)); if (state == State.WAITING && id == lastEnqueued + 1) { @@ -588,6 +603,8 @@ public class Inbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, Si if (maxInFlightCount / 2 >= currentInFlightCount) { int countOfBatches = maxInFlightCount - currentInFlightCount; + onBatchesRequested(countOfBatches); + lastRequested += countOfBatches; batchRequester.request(countOfBatches, sharedStateHolder); @@ -654,5 +671,52 @@ public class Inbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, Si lastEnqueued = curr.batchId; } + + void dumpMetrics(IgniteStringBuilder writer, String indent, String nodeName) { + writer.app(indent).app("Remote node: nodeName=").app(nodeName) + .app(", receivedRows=").app(rowsReceived) + .app(", batchesRequested=").app(batchesRequested); + + if (batchesRequested > 0) { + writer.app(", requestTime=[agv=").app(MetricsAwareNode.beautifyNanoTime(requestTotalTime / batchesRequested)) + .app(", min=").app(MetricsAwareNode.beautifyNanoTime(requestMinTime)) + .app(", max=").app(MetricsAwareNode.beautifyNanoTime(requestMaxTime)).app("]"); + } + } + + private void onBatchesRequested(int countOfBatches) { + long timestamp = System.nanoTime(); + batchesRequested += countOfBatches; + for (int i = 1; i <= countOfBatches; i++) { + batchTimestamps.put(lastRequested + i, timestamp); + } + } + + private void batchReceived(int id, int rows) { + this.rowsReceived += rows; + + long time = System.nanoTime() - batchTimestamps.remove(id); + this.requestTotalTime += time; + this.requestMinTime = Math.min(time, requestMinTime); + this.requestMaxTime = Math.max(time, requestMaxTime); + } + } + + @Override + @TestOnly + public void dumpNodeMetrics(IgniteStringBuilder writer, String indent) { + // Calculate aggregated statistics. + onRowsReceived(perNodeBuffers.values().stream().mapToLong(n -> n.rowsReceived).sum()); + + writer.app(indent); + dumpMetrics0(writer); + writer.app(", fragmentId=").app(srcFragmentId); + writer.nl(); + + String childIndent = Debuggable.childIndentation(indent); + for (String nodeName : srcNodeNames) { + perNodeBuffers.get(nodeName).dumpMetrics(writer, childIndent, nodeName); + writer.nl(); + } } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java index 38d4097b5e3..cd423dcab75 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java @@ -27,6 +27,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.util.ImmutableIntList; +import org.apache.ignite.internal.lang.IgniteStringBuilder; import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.exec.PartitionProvider; @@ -157,4 +158,10 @@ public class IndexScanNode<RowT> extends StorageScanNode<RowT> { throw new AssertionError("Unexpected index type: " + schemaIndex.type()); } } + + @Override + protected void dumpMetrics0(IgniteStringBuilder writer) { + super.dumpMetrics0(writer); + writer.app(", indexName=").app(schemaIndex.name()); + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java index 6ba21783076..4d4f26c4626 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.exec.rel; import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty; import java.util.Comparator; +import java.util.List; import java.util.function.Predicate; import java.util.function.Supplier; import org.apache.calcite.rel.RelCollation; @@ -81,6 +82,8 @@ public class IndexSpoolNode<RowT> extends AbstractNode<RowT> implements SingleNo /** {@inheritDoc} */ @Override public void rewind() { + onRewind(); + rewindInternal(); } @@ -100,6 +103,8 @@ public class IndexSpoolNode<RowT> extends AbstractNode<RowT> implements SingleNo assert !nullOrEmpty(sources()) && sources().size() == 1; assert rowsCnt > 0; + onRequestReceived(); + if (!indexReady()) { requested = rowsCnt; @@ -112,6 +117,8 @@ public class IndexSpoolNode<RowT> extends AbstractNode<RowT> implements SingleNo /** {@inheritDoc} */ @Override public void push(RowT row) throws Exception { + onRowReceived(); + idx.push(row); waiting--; @@ -205,4 +212,13 @@ public class IndexSpoolNode<RowT> extends AbstractNode<RowT> implements SingleNo return new IndexSpoolNode<>(ctx, idx, scan); } + + @Override + public void dumpNodeMetrics(IgniteStringBuilder writer, String indent) { + writer.app(indent); + dumpMetrics0(writer); + writer.nl(); + + MetricsAwareNode.dumpChildNodesMetrics(writer, indent, List.of(scan)); + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java index 48f00e9026b..ed7757c4b8f 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/LimitNode.java @@ -43,6 +43,9 @@ public class LimitNode<RowT> extends AbstractNode<RowT> implements SingleNode<Ro /** Upper requested rows. */ private int requested; + // Metrics + private long filteredRows; + /** * Constructor. * @@ -66,6 +69,8 @@ public class LimitNode<RowT> extends AbstractNode<RowT> implements SingleNode<Ro assert !nullOrEmpty(sources()) && sources().size() == 1; assert rowsCnt > 0; + onRequestReceived(); + if (!hasMoreData()) { end(); @@ -91,18 +96,21 @@ public class LimitNode<RowT> extends AbstractNode<RowT> implements SingleNode<Ro /** {@inheritDoc} */ @Override public void push(RowT row) throws Exception { + onRowReceived(); + if (waiting == NOT_WAITING) { + onRowFiltered(); return; } --waiting; - if (rowsProcessed >= offset) { - if (hasMoreData()) { - // this two rows can`t be swapped, cause if all requested rows have been pushed it will trigger further request call. - --requested; - downstream().push(row); - } + if (rowsProcessed >= offset && hasMoreData()) { + // this two rows can`t be swapped, cause if all requested rows have been pushed it will trigger further request call. + --requested; + downstream().push(row); + } else { + onRowFiltered(); } ++rowsProcessed; @@ -120,6 +128,10 @@ public class LimitNode<RowT> extends AbstractNode<RowT> implements SingleNode<Ro } } + private long onRowFiltered() { + return filteredRows++; + } + /** {@inheritDoc} */ @Override public void end() throws Exception { @@ -162,6 +174,12 @@ public class LimitNode<RowT> extends AbstractNode<RowT> implements SingleNode<Ro .app(", rowsProcessed=").app(rowsProcessed); } + @Override + protected void dumpMetrics0(IgniteStringBuilder writer) { + super.dumpMetrics0(writer); + writer.app(", filteredRows=").app(filteredRows); + } + /** {@code True} if fetch is undefined, or current rows processed is less than required. */ private boolean hasMoreData() { return fetchUndefined || rowsProcessed < fetch + offset; diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java index 3e0c40c9b00..ada91778f69 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java @@ -53,6 +53,10 @@ public abstract class MergeJoinNode<RowT> extends AbstractNode<RowT> { protected boolean inLoop; + // Metrics + private long receivedRowsFromLeft = 0L; + private long receiveRowsFromRight = 0L; + /** * Creates MergeJoinNode. * @@ -71,6 +75,8 @@ public abstract class MergeJoinNode<RowT> extends AbstractNode<RowT> { assert !nullOrEmpty(sources()) && sources().size() == 2; assert rowsCnt > 0 && requested == 0; + onRequestReceived(); + requested = rowsCnt; if (!inLoop) { @@ -149,6 +155,8 @@ public abstract class MergeJoinNode<RowT> extends AbstractNode<RowT> { assert downstream() != null; assert waitingLeft > 0; + onRowReceivedFromLeft(); + waitingLeft--; leftInBuf.add(row); @@ -162,6 +170,8 @@ public abstract class MergeJoinNode<RowT> extends AbstractNode<RowT> { assert downstream() != null; assert waitingRight > 0; + onRowReceivedFromRight(); + waitingRight--; rightInBuf.add(row); @@ -1260,4 +1270,22 @@ public abstract class MergeJoinNode<RowT> extends AbstractNode<RowT> { } } } + + @Override + protected void dumpMetrics0(IgniteStringBuilder writer) { + // Calculate aggregated statistics. + onRowsReceived(receivedRowsFromLeft + receiveRowsFromRight); + + super.dumpMetrics0(writer); + writer.app(", leftRows=").app(receivedRowsFromLeft) + .app(", rightRows=").app(receiveRowsFromRight); + } + + private void onRowReceivedFromLeft() { + receivedRowsFromLeft++; + } + + private void onRowReceivedFromRight() { + receiveRowsFromRight++; + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MetricsAwareNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MetricsAwareNode.java new file mode 100644 index 00000000000..34cde41e12d --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MetricsAwareNode.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec.rel; + +import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.lang.Debuggable; +import org.apache.ignite.internal.lang.IgniteStringBuilder; + +/** + * Interface provides method for dumping query metrics. + */ +interface MetricsAwareNode { + /** Helper method to dump child nodes metrics. */ + static void dumpChildNodesMetrics(IgniteStringBuilder writer, String indent, Iterable<? extends MetricsAwareNode> nodes) { + if (nodes != null) { + String childIndent = Debuggable.childIndentation(indent); + nodes.forEach(s -> s.dumpNodeMetrics(writer, childIndent)); + } + } + + /** Helper method for converting time in nanos to a string. */ + static String beautifyNanoTime(long nanos) { + return TimeUnit.NANOSECONDS.toMillis(nanos) + "ms"; + } + + /** + * Dump current node metrics to a given writer. + * Note: node should bother writing provided indentation at each new line. + */ + void dumpNodeMetrics(IgniteStringBuilder writer, String indent); +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java index 1a0412ffea0..54b3af227ab 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java @@ -157,6 +157,8 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> implements SingleNode<R assert !nullOrEmpty(sources()) && sources().size() == 1; assert rowsCnt > 0 && requested == 0; + onRequestReceived(); + requested = rowsCnt; requestNextBatchIfNeeded(); @@ -168,6 +170,8 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> implements SingleNode<R assert downstream() != null; assert waiting > 0; + onRowReceived(); + waiting--; rows.add(row); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Node.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Node.java index 23934da2f05..a10cb19b364 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Node.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Node.java @@ -28,7 +28,7 @@ import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; * <b>Note</b>: except several cases (like consumer node and mailboxes), {@link Node#request(int)}, {@link Node#close()}, * {@link Downstream#push(Object)} and {@link Downstream#end()} methods should be used from one single thread. */ -public interface Node<RowT> extends AutoCloseable, Debuggable { +public interface Node<RowT> extends AutoCloseable, Debuggable, MetricsAwareNode { /** * Returns runtime context allowing access to the tables in a database. * diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java index 1e143cb973e..3411c035cf7 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java @@ -159,6 +159,8 @@ public class Outbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, S public void push(RowT row) throws Exception { assert waiting > 0 : waiting; + onRowReceived(); + waiting--; if (currentNode == null || dest.targets(row).contains(currentNode)) { @@ -176,6 +178,10 @@ public class Outbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, S waiting = NOT_WAITING; flush(); + + if (ExecutionContext.DUMP_METRICS) { + dumpFragmentMetrics(); + } } /** {@inheritDoc} */ @@ -189,6 +195,10 @@ public class Outbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, S /** {@inheritDoc} */ @Override public void closeInternal() { + if (waiting != NOT_WAITING && ExecutionContext.DUMP_METRICS) { + dumpFragmentMetrics(); + } + super.closeInternal(); registry.unregister(this); @@ -496,6 +506,11 @@ public class Outbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, S private @Nullable List<RowT> curr; private int pendingCount; + // Metrics + long batchesRequested = 0L; + long rowsReceived = 0L; + long rowsSent = 0L; + private RemoteDownstream(String nodeName, BatchSender<RowT> sender) { this.nodeName = nodeName; this.sender = sender; @@ -519,6 +534,8 @@ public class Outbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, S void onBatchRequested(int amountOfBatches) throws Exception { assert amountOfBatches > 0 : amountOfBatches; + batchesRequested += amountOfBatches; + this.pendingCount += amountOfBatches; // if there is a batch which is ready to be sent, then just sent it @@ -541,6 +558,8 @@ public class Outbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, S assert ready() : state; assert curr != null; + rowsReceived++; + curr.add(row); if (curr.size() == IO_BATCH_SIZE) { @@ -560,6 +579,8 @@ public class Outbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, S boolean lastBatch = state == State.LAST_BATCH; + rowsSent += curr.size(); + sender.send(nodeName, ++lastSentBatchId, lastBatch, curr); pendingCount--; @@ -612,4 +633,37 @@ public class Outbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>, S this.amountOfBatches = amountOfBatches; } } + + private void dumpFragmentMetrics() { + IgniteStringBuilder sb = new IgniteStringBuilder("Dump metrics for executed query:") + .app(" queryId=").app(context().queryId()) + .app(", fragmentId=").app(context().fragmentId()) + .app(" nodeName=").app(context().localNode().name()) + .nl(); + + dumpNodeMetrics(sb, ""); + + LOG.info(sb.toString()); + } + + @Override + public void dumpNodeMetrics(IgniteStringBuilder writer, String indent) { + writer.app(indent); + dumpMetrics0(writer); + writer.nl(); + + String childIndent = Debuggable.childIndentation(""); + String childIndent2 = Debuggable.childIndentation(childIndent); + + for (Entry<String, RemoteDownstream<RowT>> entry : this.nodeBuffers.entrySet()) { + RemoteDownstream<RowT> downstream = entry.getValue(); + writer.app(childIndent2) + .app("Node: nodeName=").app(entry.getKey()) + .app(", rowsSent=").app(downstream.rowsSent) + .app(", batchesRequested=").app(downstream.batchesRequested); + writer.nl(); + } + + MetricsAwareNode.dumpChildNodesMetrics(writer, childIndent, sources()); + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ProjectNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ProjectNode.java index 376c96ae4ad..ea731e973e1 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ProjectNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ProjectNode.java @@ -55,6 +55,8 @@ public class ProjectNode<RowT> extends AbstractNode<RowT> implements SingleNode< assert !nullOrEmpty(sources()) && sources().size() == 1; assert rowsCnt > 0; + onRequestReceived(); + source().request(rowsCnt); } @@ -63,6 +65,8 @@ public class ProjectNode<RowT> extends AbstractNode<RowT> implements SingleNode< public void push(RowT row) throws Exception { assert downstream() != null; + onRowReceived(); + downstream().push(prj.apply(row)); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java index fe17d26c01f..b5f0e438f51 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/RootNode.java @@ -141,6 +141,8 @@ public class RootNode<RowT> extends AbstractNode<RowT> implements SingleNode<Row try { assert waiting > 0; + onRowReceived(); + waiting--; inBuff.offer(row); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java index 36632a0935b..7291b452e02 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ScanNode.java @@ -66,6 +66,8 @@ public class ScanNode<RowT> extends AbstractNode<RowT> implements SingleNode<Row public void request(int rowsCnt) throws Exception { assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", requested=" + requested; + onRequestReceived(); + requested = rowsCnt; if (!inLoop) { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java index 6ea047b2d08..1c46c80f8cb 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java @@ -105,6 +105,8 @@ public class SortAggregateNode<RowT> extends AbstractNode<RowT> implements Singl assert !nullOrEmpty(sources()) && sources().size() == 1; assert rowsCnt > 0 && requested == 0; + onRequestReceived(); + requested = rowsCnt; if (waiting == 0) { @@ -120,6 +122,8 @@ public class SortAggregateNode<RowT> extends AbstractNode<RowT> implements Singl assert downstream() != null; assert waiting > 0; + onRowReceived(); + waiting--; if (grp != null) { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortNode.java index e9b9a6bed36..5900270c3cd 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortNode.java @@ -126,6 +126,8 @@ public class SortNode<RowT> extends AbstractNode<RowT> implements SingleNode<Row assert rowsCnt > 0 && requested == 0; assert waiting <= 0; + onRequestReceived(); + if (fetch == 0) { downstream().end(); @@ -148,6 +150,8 @@ public class SortNode<RowT> extends AbstractNode<RowT> implements SingleNode<Row assert waiting > 0; assert reversed == null || reversed.isEmpty(); + onRowReceived(); + waiting--; rows.add(row); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java index ae248273261..a131d31907b 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/StorageScanNode.java @@ -36,7 +36,7 @@ import org.jetbrains.annotations.Nullable; public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> { private Queue<RowT> inBuff = new LinkedBlockingQueue<>(inBufSize); - private final @Nullable Predicate<RowT> filters; + private final Predicate<RowT> filters; private final @Nullable Function<RowT, RowT> rowTransformer; @@ -51,6 +51,11 @@ public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> { /** Flag that indicate scan method was called already. */ private boolean dataRequested; + // Metrics + private long filteredRows = 0L; + private long scanStartTime = -1L; + private long scanTime = 0L; + /** * Constructor. * @@ -76,6 +81,8 @@ public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> { public void request(int rowsCnt) throws Exception { assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", requested=" + requested; + onRequestReceived(); + requested = rowsCnt; if (!inLoop) { @@ -136,6 +143,7 @@ public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> { RowT row = inBuff.poll(); if (filters != null && !filters.test(row)) { + onRowFiltered(); continue; } @@ -180,6 +188,8 @@ public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> { waiting = inBufSize - inBuff.size(); } + onScanStarted(); + Subscription subscription = this.activeSubscription; if (subscription != null) { subscription.request(waiting); @@ -233,8 +243,12 @@ public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> { // It is safe not to be aware about already closed execution flow. inBuffInner.add(row); - if (inBuffInner.size() == inBufSize) { + int size = inBuffInner.size(); + if (size == inBufSize) { StorageScanNode.this.execute(() -> { + onRowsReceived(size); + onScanFinished(); + waiting = 0; push(); }); @@ -253,6 +267,9 @@ public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> { @Override public void onComplete() { StorageScanNode.this.execute(() -> { + onRowsReceived(inBuff.size()); + onScanFinished(); + activeSubscription = null; waiting = 0; @@ -260,4 +277,38 @@ public abstract class StorageScanNode<RowT> extends AbstractNode<RowT> { }); } } + + @Override + protected void dumpMetrics0(IgniteStringBuilder writer) { + writer.app(this.getClass().getSimpleName()).app(": "); + + writer.app("scannedRows=").app(receivedRowsCount); + + if (requestCount > 0) { + writer.app(", requests=").app(requestCount); + } + + if (rewindCount > 0) { + writer.app(", rewinds=").app(rewindCount); + } + + if (filteredRows > 0) { + writer.app(", filteredRows=").app(filteredRows); + } + + writer.app(", scanTime=").app(MetricsAwareNode.beautifyNanoTime(scanTime)); + } + + private void onRowFiltered() { + filteredRows++; + } + + private void onScanStarted() { + scanStartTime = System.nanoTime(); + } + + private void onScanFinished() { + scanTime += System.nanoTime() - scanStartTime; + scanStartTime = -1L; + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java index 25e6500ef79..afc559a5331 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNode.java @@ -23,11 +23,13 @@ import java.util.concurrent.Flow.Publisher; import java.util.function.Function; import java.util.function.Predicate; import org.apache.calcite.util.ImmutableIntList; +import org.apache.ignite.internal.lang.IgniteStringBuilder; import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.exec.PartitionProvider; import org.apache.ignite.internal.sql.engine.exec.PartitionWithConsistencyToken; import org.apache.ignite.internal.sql.engine.exec.ScannableTable; +import org.apache.ignite.internal.sql.engine.schema.IgniteTable; import org.apache.ignite.internal.util.SubscriptionUtils; import org.apache.ignite.internal.util.TransformingIterator; import org.jetbrains.annotations.Nullable; @@ -47,11 +49,14 @@ public class TableScanNode<RowT> extends StorageScanNode<RowT> { private final int @Nullable [] requiredColumns; + private final String tableName; + /** * Constructor. * * @param ctx Execution context. * @param rowFactory Row factory. + * @param schemaTable Schema table. * @param table Internal table. * @param partitionProvider List of pairs containing the partition number to scan with the corresponding enlistment * consistency token. @@ -62,6 +67,7 @@ public class TableScanNode<RowT> extends StorageScanNode<RowT> { public TableScanNode( ExecutionContext<RowT> ctx, RowFactory<RowT> rowFactory, + IgniteTable schemaTable, ScannableTable table, PartitionProvider<RowT> partitionProvider, @Nullable Predicate<RowT> filters, @@ -74,6 +80,7 @@ public class TableScanNode<RowT> extends StorageScanNode<RowT> { this.partitionProvider = partitionProvider; this.rowFactory = rowFactory; this.requiredColumns = requiredColumns == null ? null : requiredColumns.toIntArray(); + this.tableName = schemaTable.name(); } /** {@inheritDoc} */ @@ -86,4 +93,10 @@ public class TableScanNode<RowT> extends StorageScanNode<RowT> { return SubscriptionUtils.concat(it); } + + @Override + protected void dumpMetrics0(IgniteStringBuilder writer) { + super.dumpMetrics0(writer); + writer.app(", tableName=").app(tableName); + } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolNode.java index ec45a488875..ce793912c54 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/TableSpoolNode.java @@ -77,6 +77,8 @@ public class TableSpoolNode<RowT> extends AbstractNode<RowT> implements SingleNo /** {@inheritDoc} */ @Override public void rewind() { + onRewind(); + rewindInternal(); } @@ -142,6 +144,8 @@ public class TableSpoolNode<RowT> extends AbstractNode<RowT> implements SingleNo assert downstream() != null; assert waiting > 0; + onRowReceived(); + waiting--; rows.add(row); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/UnionAllNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/UnionAllNode.java index 1fbfe213bd7..d41e7bd074c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/UnionAllNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/UnionAllNode.java @@ -56,6 +56,8 @@ public class UnionAllNode<RowT> extends AbstractNode<RowT> implements Downstream assert !nullOrEmpty(sources()); assert rowsCnt > 0 && waiting == 0; + onRequestReceived(); + source().request(waiting = rowsCnt); } @@ -65,6 +67,8 @@ public class UnionAllNode<RowT> extends AbstractNode<RowT> implements Downstream assert downstream() != null; assert waiting > 0; + onRowReceived(); + waiting--; downstream().push(row); diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java index 45109a745b6..d4e1adc4d5a 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNodeTest.java @@ -80,7 +80,7 @@ class AsyncRootNodeTest extends AbstractExecutionTest<RowWrapper> { null ); - var rootNode = new AsyncRootNode<>(dataSourceScanNode, Function.identity()); + var rootNode = new AsyncRootNode<>(context, dataSourceScanNode, Function.identity()); rootNode.requestNextAsync(1); @@ -123,7 +123,7 @@ class AsyncRootNodeTest extends AbstractExecutionTest<RowWrapper> { null ); - AsyncRootNode<RowWrapper, RowWrapper> rootNode = new AsyncRootNode<>(dataSourceScanNode, Function.identity()); + AsyncRootNode<RowWrapper, RowWrapper> rootNode = new AsyncRootNode<>(context, dataSourceScanNode, Function.identity()); dataSourceScanNode.onRegister(rootNode); // trigger prefetch @@ -171,7 +171,7 @@ class AsyncRootNodeTest extends AbstractExecutionTest<RowWrapper> { return IntStream.range(0, 76).mapToObj(factory::create).iterator(); }); - AsyncRootNode<RowWrapper, RowWrapper> rootNode = new AsyncRootNode<>(scanNode, Function.identity()); + AsyncRootNode<RowWrapper, RowWrapper> rootNode = new AsyncRootNode<>(context, scanNode, Function.identity()); scanNode.onRegister(rootNode); // trigger prefetch diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java index 403422e9d72..614981419ba 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java @@ -531,7 +531,7 @@ public class ExchangeExecutionTest extends AbstractExecutionTest<Object[]> { } RewindableAsyncRoot<Object[], Object[]> root = new RewindableAsyncRoot<>( - node, Function.identity() + targetCtx, node, Function.identity() ); node.onRegister(root); @@ -665,8 +665,8 @@ public class ExchangeExecutionTest extends AbstractExecutionTest<Object[]> { * @param source A source to requests rows from. * @param converter A converter to convert rows from an internal format to desired output format. */ - RewindableAsyncRoot(AbstractNode<InT> source, Function<InT, OutT> converter) { - super(source, converter); + RewindableAsyncRoot(ExecutionContext<InT> ctx, AbstractNode<InT> source, Function<InT, OutT> converter) { + super(ctx, source, converter); } @Override diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java index 8394a75b869..fb61d4e6ebe 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExecutionTest.java @@ -427,6 +427,11 @@ public class ExecutionTest extends AbstractExecutionTest<Object[]> { public void dumpState(IgniteStringBuilder writer, String indent) { writer.app(indent).app("class=").app(getClass().getSimpleName()).nl(); } + + @Override + public void dumpNodeMetrics(IgniteStringBuilder writer, String indent) { + writer.app(indent).app(getClass().getSimpleName()).nl(); + } } @Override diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java index 5db917e662c..7c211a55ff7 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java @@ -79,6 +79,7 @@ import org.apache.ignite.internal.sql.engine.exec.TableRowConverter; import org.apache.ignite.internal.sql.engine.framework.ArrayRowHandler; import org.apache.ignite.internal.sql.engine.framework.DataProvider; import org.apache.ignite.internal.sql.engine.framework.TestBuilders; +import org.apache.ignite.internal.sql.engine.schema.IgniteTable; import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; import org.apache.ignite.internal.sql.engine.util.TypeUtils; import org.apache.ignite.internal.storage.engine.MvTableStorage; @@ -228,7 +229,8 @@ public class TableScanNodeExecutionTest extends AbstractExecutionTest<Object[]> }; ScannableTableImpl scanableTable = new ScannableTableImpl(internalTable, rf -> rowConverter); PartitionProvider<Object[]> partitionProvider = PartitionProvider.fromPartitions(partsWithConsistencyTokens); - TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx, rowFactory, scanableTable, + IgniteTable schemaTable = mock(IgniteTable.class); + TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx, rowFactory, schemaTable, scanableTable, partitionProvider, null, null, null); RootNode<Object[]> root = new RootNode<>(ctx); @@ -282,7 +284,9 @@ public class TableScanNodeExecutionTest extends AbstractExecutionTest<Object[]> RowFactory<Object[]> rowFactory = ctx.rowFactoryFactory().create(schema); ScannableTable scannableTable = TestBuilders.tableScan(DataProvider.fromRow(new Object[]{42}, partDataSize)); - TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx, rowFactory, scannableTable, c -> partitions, null, null, null); + IgniteTable schemaTable = mock(IgniteTable.class); + TableScanNode<Object[]> scanNode = new TableScanNode<>(ctx, rowFactory, schemaTable, scannableTable, + c -> partitions, null, null, null); RootNode<Object[]> rootNode = new RootNode<>(ctx); rootNode.register(scanNode);
