zstan commented on code in PR #13004:
URL: https://github.com/apache/ignite/pull/13004#discussion_r3062380070
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java:
##########
Review Comment:
it really need to be removed
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java:
##########
@@ -921,73 +924,72 @@ private void checkStarvation() throws Exception {
/** */
@Test
- public void testUdfQueryWarningStripedExecutor() throws Exception {
- assertTrue(queryProcessor(grid(0)).taskExecutor() instanceof
StripedQueryTaskExecutor);
+ public void testUdfQueryDeadlockDetectionStripedExecutor() throws
Exception {
+ IgniteEx ignite = grid(0);
+
+ assertTrue(queryProcessor(ignite).taskExecutor() instanceof
StripedQueryTaskExecutor);
+
+ client.getOrCreateCache(new CacheConfiguration<Integer,
Integer>(DEFAULT_CACHE_NAME)
+ .setSqlFunctionClasses(FunctionsLibrary.class)
+ .setSqlSchema("PUBLIC")
+ );
+
+ // Expect message with tips about switching to query blocking task
executor.
+ String expMsg =
"-DIGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR=true";
Review Comment:
```suggestion
String expMsg = IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR +
"=true";
```
##########
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java:
##########
@@ -184,6 +184,18 @@ protected void assertThrows(String sql, Class<? extends
Exception> cls, String m
assertThrowsAnyCause(log, () -> sql(sql, args), cls, msg);
}
+ /**
+ * Asserts that executeSql throws an exception.
Review Comment:
nitpicking. not _executeSql_ but _sql_ it was changed in IGNITE-22717 but
java doc is the same
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java:
##########
@@ -574,6 +582,38 @@ private FieldsQueryCursor<List<?>>
executeDdl(RootQuery<Row> qry, DdlPlan plan)
}
}
+ /**
+ * Checks that query is initiated by UDF.
+ *
+ * @return {@code True} if query is initiated by UDF (in this case UDF
query limit is affected).
+ * @throws IgniteSQLException If query execution can lead to deadlocks.
+ */
+ private boolean checkUdfQuery() {
+ if
(Thread.currentThread().getName().startsWith(AbstractQueryTaskExecutor.THREAD_PREFIX))
{
+ if (taskExecutor instanceof QueryBlockingTaskExecutor) {
+ if (udfQryLimit.getAndDecrement() <= 0) {
Review Comment:
why udfQryLimit.getAndDecrement() **<** 0 is not enough here ? seems some
description is helpful here
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java:
##########
@@ -594,207 +634,218 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
checkPermissions(fragment.root());
}
- // Local execution
- Fragment fragment = F.first(fragments);
+ boolean udfQry = checkUdfQuery();
+
+ try {
+ // Local execution
+ Fragment fragment = F.first(fragments);
- if (U.assertionsEnabled()) {
- assert fragment != null;
+ if (U.assertionsEnabled()) {
+ assert fragment != null;
- FragmentMapping mapping = execPlan.mapping(fragment);
+ FragmentMapping mapping = execPlan.mapping(fragment);
- assert mapping != null;
+ assert mapping != null;
- List<UUID> nodes = mapping.nodeIds();
+ List<UUID> nodes = mapping.nodeIds();
- assert nodes != null && (nodes.size() == 1 &&
F.first(nodes).equals(localNodeId()) || nodes.isEmpty())
+ assert nodes != null && (nodes.size() == 1 &&
F.first(nodes).equals(localNodeId()) || nodes.isEmpty())
: "nodes=" + nodes + ", localNode=" + localNodeId();
- }
+ }
- long timeout = qry.remainingTime();
+ long timeout = qry.remainingTime();
- if (timeout == 0) {
- throw new IgniteSQLException("The query was cancelled due to
timeout", IgniteQueryErrorCode.QUERY_CANCELED,
- new QueryCancelledException());
- }
+ if (timeout == 0) {
+ throw new IgniteSQLException("The query was cancelled due to
timeout", IgniteQueryErrorCode.QUERY_CANCELED,
+ new QueryCancelledException());
+ }
- FragmentDescription fragmentDesc = new FragmentDescription(
- fragment.fragmentId(),
- execPlan.mapping(fragment),
- execPlan.target(fragment),
- execPlan.remotes(fragment));
-
- MemoryTracker qryMemoryTracker =
qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota());
-
- final GridNearTxLocal userTx = Commons.queryTransaction(qry.context(),
ctx.cache().context());
-
- ExecutionContext<Row> ectx = new ExecutionContext<>(
- qry.context(),
- taskExecutor(),
- injectSvc,
- qry.id(),
- locNodeId,
- locNodeId,
- mapCtx.topologyVersion(),
- fragmentDesc,
- handler,
- qryMemoryTracker,
- createIoTracker(locNodeId, qry.localQueryId()),
- timeout,
- qryParams,
- userTx == null ? null :
ExecutionContext.transactionChanges(userTx.writeEntries()));
-
- Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(),
mailboxRegistry(),
- exchangeService(), failureProcessor()).go(fragment.root());
-
- qry.run(ectx, execPlan, plan.fieldsMetadata(), node);
-
- Map<UUID, Long> fragmentsPerNode = fragments.stream()
- .skip(1)
- .flatMap(f -> f.mapping().nodeIds().stream())
- .collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
-
- // Start remote execution.
- for (int i = 1; i < fragments.size(); i++) {
- fragment = fragments.get(i);
- fragmentDesc = new FragmentDescription(
+ FragmentDescription fragmentDesc = new FragmentDescription(
fragment.fragmentId(),
execPlan.mapping(fragment),
execPlan.target(fragment),
execPlan.remotes(fragment));
- Throwable ex = null;
- byte[] parametersMarshalled = null;
-
- for (UUID nodeId : fragmentDesc.nodeIds()) {
- if (ex != null)
- qry.onResponse(nodeId, fragment.fragmentId(), ex);
- else {
- try {
- SessionContextImpl sesCtx =
qry.context().unwrap(SessionContextImpl.class);
-
- QueryProperties props =
qry.context().unwrap(QueryProperties.class);
- boolean keepBinaryMode = props == null ||
props.keepBinary();
-
- QueryStartRequest req = new QueryStartRequest(
- qry.id(),
- qry.localQueryId(),
- qry.context().schemaName(),
- fragment.serialized(),
- ectx.topologyVersion(),
- fragmentDesc,
- fragmentsPerNode.get(nodeId).intValue(),
- qry.parameters(),
- parametersMarshalled,
- timeout,
- ectx.getQryTxEntries(),
- sesCtx == null ? null : sesCtx.attributes(),
- keepBinaryMode
- );
-
- messageService().send(nodeId, req);
-
- // Avoid marshaling of the same parameters for other
nodes.
- if (parametersMarshalled == null)
- parametersMarshalled = req.parametersMarshalled();
- }
- catch (Throwable e) {
- qry.onResponse(nodeId, fragment.fragmentId(), ex = e);
+ MemoryTracker qryMemoryTracker =
qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota());
+
+ final GridNearTxLocal userTx =
Commons.queryTransaction(qry.context(), ctx.cache().context());
+
+ ExecutionContext<Row> ectx = new ExecutionContext<>(
+ qry.context(),
+ taskExecutor(),
+ injectSvc,
+ qry.id(),
+ locNodeId,
+ locNodeId,
+ mapCtx.topologyVersion(),
+ fragmentDesc,
+ handler,
+ qryMemoryTracker,
+ createIoTracker(locNodeId, qry.localQueryId()),
+ timeout,
+ qryParams,
+ userTx == null ? null :
ExecutionContext.transactionChanges(userTx.writeEntries()));
+
+ Node<Row> node = new LogicalRelImplementor<>(ectx,
partitionService(), mailboxRegistry(),
+ exchangeService(), failureProcessor()).go(fragment.root());
+
+ qry.run(ectx, execPlan, plan.fieldsMetadata(), node);
+
+ Map<UUID, Long> fragmentsPerNode = fragments.stream()
+ .skip(1)
+ .flatMap(f -> f.mapping().nodeIds().stream())
+ .collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
+
+ QueryProperties qryProps =
qry.context().unwrap(QueryProperties.class);
+ boolean keepBinary = qryProps == null || qryProps.keepBinary();
+
+ // Start remote execution.
+ for (int i = 1; i < fragments.size(); i++) {
+ fragment = fragments.get(i);
+ fragmentDesc = new FragmentDescription(
+ fragment.fragmentId(),
+ execPlan.mapping(fragment),
+ execPlan.target(fragment),
+ execPlan.remotes(fragment));
+
+ Throwable ex = null;
+ byte[] parametersMarshalled = null;
+
+ for (UUID nodeId : fragmentDesc.nodeIds()) {
+ if (ex != null)
+ qry.onResponse(nodeId, fragment.fragmentId(), ex);
+ else {
+ try {
+ SessionContextImpl sesCtx =
qry.context().unwrap(SessionContextImpl.class);
+
+ QueryStartRequest req = new QueryStartRequest(
+ qry.id(),
+ qry.localQueryId(),
+ qry.context().schemaName(),
+ fragment.serialized(),
+ ectx.topologyVersion(),
+ fragmentDesc,
+ fragmentsPerNode.get(nodeId).intValue(),
+ qry.parameters(),
+ parametersMarshalled,
+ timeout,
+ ectx.getQryTxEntries(),
+ sesCtx == null ? null : sesCtx.attributes(),
+ keepBinary
+ );
+
+ messageService().send(nodeId, req);
+
+ // Avoid marshaling of the same parameters for
other nodes.
+ if (parametersMarshalled == null)
+ parametersMarshalled =
req.parametersMarshalled();
+ }
+ catch (Throwable e) {
+ qry.onResponse(nodeId, fragment.fragmentId(), ex =
e);
+ }
}
}
}
- }
-
- if (perfStatProc.enabled()) {
- perfStatProc.queryProperty(
- GridCacheQueryType.SQL_FIELDS,
- qry.initiatorNodeId(),
- qry.localQueryId(),
- "Query plan",
- plan.textPlan()
- );
- }
- if (ctx.query().runningQueryManager().planHistoryTracker().enabled()) {
- ctx.query().runningQueryManager().planHistoryTracker().addPlan(
- plan.textPlan(),
- qry.sql(),
- qry.context().schemaName(),
- qry.context().isLocal(),
- CalciteQueryEngineConfiguration.ENGINE_NAME
- );
- }
+ if (perfStatProc.enabled()) {
+ perfStatProc.queryProperty(
+ GridCacheQueryType.SQL_FIELDS,
+ qry.initiatorNodeId(),
+ qry.localQueryId(),
+ "Query plan",
+ plan.textPlan()
+ );
+ }
- QueryProperties qryProps = qry.context().unwrap(QueryProperties.class);
+ if
(ctx.query().runningQueryManager().planHistoryTracker().enabled()) {
+ ctx.query().runningQueryManager().planHistoryTracker().addPlan(
+ plan.textPlan(),
+ qry.sql(),
+ qry.context().schemaName(),
+ qry.context().isLocal(),
+ CalciteQueryEngineConfiguration.ENGINE_NAME
+ );
+ }
- Function<Object, Object> fieldConverter = (qryProps == null ||
qryProps.keepBinary()) ? null :
- o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o, false,
true, null);
+ Function<Object, Object> fieldConverter = keepBinary ? null :
+ o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o,
false, true, null);
+
+ HeavyQueriesTracker.ResultSetChecker resultSetChecker =
ctx.query().runningQueryManager()
+ .heavyQueriesTracker().resultSetChecker(qry);
+
+ Function<List<Object>, List<Object>> rowConverter;
+
+ // Fire EVT_CACHE_QUERY_OBJECT_READ on initiator node before
return result to cursor.
+ if (qryProps != null && qryProps.cacheName() != null &&
evtMgr.isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
+ ClusterNode locNode = ctx.discovery().localNode();
+ UUID subjId = SecurityUtils.securitySubjectId(ctx);
+
+ rowConverter = row -> {
+ evtMgr.record(new CacheQueryReadEvent<>(
+ locNode,
+ "SQL fields query result set row read.",
+ EVT_CACHE_QUERY_OBJECT_READ,
+ CacheQueryType.SQL_FIELDS.name(),
+ qryProps.cacheName(),
+ null,
+ qry.sql(),
+ null,
+ null,
+ qry.parameters(),
+ subjId,
+ null,
+ null,
+ null,
+ null,
+ row));
+
+ resultSetChecker.checkOnFetchNext();
+
+ return row;
+ };
+ }
+ else {
+ rowConverter = row -> {
+ resultSetChecker.checkOnFetchNext();
- HeavyQueriesTracker.ResultSetChecker resultSetChecker =
ctx.query().runningQueryManager()
- .heavyQueriesTracker().resultSetChecker(qry);
+ return row;
+ };
+ }
- Function<List<Object>, List<Object>> rowConverter;
+ Runnable onClose = () -> {
+ if (udfQry) // Restore UDF queries limit.
+ udfQryLimit.getAndIncrement();
+
+ if (perfStatProc.enabled()) {
+ perfStatProc.queryRowsProcessed(
+ GridCacheQueryType.SQL_FIELDS,
+ qry.initiatorNodeId(),
+ qry.localQueryId(),
+ "Fetched",
+ resultSetChecker.fetchedSize()
+ );
+ }
- // Fire EVT_CACHE_QUERY_OBJECT_READ on initiator node before return
result to cursor.
- if (qryProps != null && qryProps.cacheName() != null &&
evtMgr.isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
- ClusterNode locNode = ctx.discovery().localNode();
- UUID subjId = SecurityUtils.securitySubjectId(ctx);
+ resultSetChecker.checkOnClose();
+ };
- rowConverter = row -> {
- evtMgr.record(new CacheQueryReadEvent<>(
- locNode,
- "SQL fields query result set row read.",
- EVT_CACHE_QUERY_OBJECT_READ,
- CacheQueryType.SQL_FIELDS.name(),
- qryProps.cacheName(),
- null,
- qry.sql(),
- null,
- null,
- qry.parameters(),
- subjId,
- null,
- null,
- null,
- null,
- row));
+ Iterator<List<?>> it = iteratorsHolder().iterator(new
ConvertingClosableIterator<>(qry.iterator(), ectx,
+ fieldConverter, rowConverter, onClose));
- resultSetChecker.checkOnFetchNext();
+ // Make yet another tracking layer for cursor.getAll(), so
tracking hierarchy will look like:
+ // Row tracker -> Cursor memory tracker -> Query memory tracker ->
Global memory tracker.
+ // It's required, since query memory tracker can be closed
concurrently during getAll() and
+ // tracked data for cursor can be lost without additional tracker.
+ MemoryTracker curMemoryTracker =
QueryMemoryTracker.create(qryMemoryTracker, cfg.getQueryMemoryQuota());
- return row;
- };
+ return new ListFieldsQueryCursor<>(plan, it, ectx,
curMemoryTracker);
}
- else {
- rowConverter = row -> {
- resultSetChecker.checkOnFetchNext();
+ catch (Exception e) {
+ if (udfQry) // Restore UDF queries limit.
Review Comment:
this is not a restore limit ) just a decrement
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java:
##########
@@ -594,207 +634,218 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
checkPermissions(fragment.root());
}
- // Local execution
- Fragment fragment = F.first(fragments);
+ boolean udfQry = checkUdfQuery();
+
+ try {
+ // Local execution
+ Fragment fragment = F.first(fragments);
- if (U.assertionsEnabled()) {
- assert fragment != null;
+ if (U.assertionsEnabled()) {
+ assert fragment != null;
- FragmentMapping mapping = execPlan.mapping(fragment);
+ FragmentMapping mapping = execPlan.mapping(fragment);
- assert mapping != null;
+ assert mapping != null;
- List<UUID> nodes = mapping.nodeIds();
+ List<UUID> nodes = mapping.nodeIds();
- assert nodes != null && (nodes.size() == 1 &&
F.first(nodes).equals(localNodeId()) || nodes.isEmpty())
+ assert nodes != null && (nodes.size() == 1 &&
F.first(nodes).equals(localNodeId()) || nodes.isEmpty())
: "nodes=" + nodes + ", localNode=" + localNodeId();
- }
+ }
- long timeout = qry.remainingTime();
+ long timeout = qry.remainingTime();
- if (timeout == 0) {
- throw new IgniteSQLException("The query was cancelled due to
timeout", IgniteQueryErrorCode.QUERY_CANCELED,
- new QueryCancelledException());
- }
+ if (timeout == 0) {
+ throw new IgniteSQLException("The query was cancelled due to
timeout", IgniteQueryErrorCode.QUERY_CANCELED,
+ new QueryCancelledException());
+ }
- FragmentDescription fragmentDesc = new FragmentDescription(
- fragment.fragmentId(),
- execPlan.mapping(fragment),
- execPlan.target(fragment),
- execPlan.remotes(fragment));
-
- MemoryTracker qryMemoryTracker =
qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota());
-
- final GridNearTxLocal userTx = Commons.queryTransaction(qry.context(),
ctx.cache().context());
-
- ExecutionContext<Row> ectx = new ExecutionContext<>(
- qry.context(),
- taskExecutor(),
- injectSvc,
- qry.id(),
- locNodeId,
- locNodeId,
- mapCtx.topologyVersion(),
- fragmentDesc,
- handler,
- qryMemoryTracker,
- createIoTracker(locNodeId, qry.localQueryId()),
- timeout,
- qryParams,
- userTx == null ? null :
ExecutionContext.transactionChanges(userTx.writeEntries()));
-
- Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(),
mailboxRegistry(),
- exchangeService(), failureProcessor()).go(fragment.root());
-
- qry.run(ectx, execPlan, plan.fieldsMetadata(), node);
-
- Map<UUID, Long> fragmentsPerNode = fragments.stream()
- .skip(1)
- .flatMap(f -> f.mapping().nodeIds().stream())
- .collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
-
- // Start remote execution.
- for (int i = 1; i < fragments.size(); i++) {
- fragment = fragments.get(i);
- fragmentDesc = new FragmentDescription(
+ FragmentDescription fragmentDesc = new FragmentDescription(
fragment.fragmentId(),
execPlan.mapping(fragment),
execPlan.target(fragment),
execPlan.remotes(fragment));
- Throwable ex = null;
- byte[] parametersMarshalled = null;
-
- for (UUID nodeId : fragmentDesc.nodeIds()) {
- if (ex != null)
- qry.onResponse(nodeId, fragment.fragmentId(), ex);
- else {
- try {
- SessionContextImpl sesCtx =
qry.context().unwrap(SessionContextImpl.class);
-
- QueryProperties props =
qry.context().unwrap(QueryProperties.class);
- boolean keepBinaryMode = props == null ||
props.keepBinary();
-
- QueryStartRequest req = new QueryStartRequest(
- qry.id(),
- qry.localQueryId(),
- qry.context().schemaName(),
- fragment.serialized(),
- ectx.topologyVersion(),
- fragmentDesc,
- fragmentsPerNode.get(nodeId).intValue(),
- qry.parameters(),
- parametersMarshalled,
- timeout,
- ectx.getQryTxEntries(),
- sesCtx == null ? null : sesCtx.attributes(),
- keepBinaryMode
- );
-
- messageService().send(nodeId, req);
-
- // Avoid marshaling of the same parameters for other
nodes.
- if (parametersMarshalled == null)
- parametersMarshalled = req.parametersMarshalled();
- }
- catch (Throwable e) {
- qry.onResponse(nodeId, fragment.fragmentId(), ex = e);
+ MemoryTracker qryMemoryTracker =
qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota());
+
+ final GridNearTxLocal userTx =
Commons.queryTransaction(qry.context(), ctx.cache().context());
+
+ ExecutionContext<Row> ectx = new ExecutionContext<>(
+ qry.context(),
+ taskExecutor(),
+ injectSvc,
+ qry.id(),
+ locNodeId,
+ locNodeId,
+ mapCtx.topologyVersion(),
+ fragmentDesc,
+ handler,
+ qryMemoryTracker,
+ createIoTracker(locNodeId, qry.localQueryId()),
+ timeout,
+ qryParams,
+ userTx == null ? null :
ExecutionContext.transactionChanges(userTx.writeEntries()));
+
+ Node<Row> node = new LogicalRelImplementor<>(ectx,
partitionService(), mailboxRegistry(),
+ exchangeService(), failureProcessor()).go(fragment.root());
+
+ qry.run(ectx, execPlan, plan.fieldsMetadata(), node);
+
+ Map<UUID, Long> fragmentsPerNode = fragments.stream()
+ .skip(1)
+ .flatMap(f -> f.mapping().nodeIds().stream())
+ .collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()));
+
+ QueryProperties qryProps =
qry.context().unwrap(QueryProperties.class);
+ boolean keepBinary = qryProps == null || qryProps.keepBinary();
+
+ // Start remote execution.
+ for (int i = 1; i < fragments.size(); i++) {
+ fragment = fragments.get(i);
+ fragmentDesc = new FragmentDescription(
+ fragment.fragmentId(),
+ execPlan.mapping(fragment),
+ execPlan.target(fragment),
+ execPlan.remotes(fragment));
+
+ Throwable ex = null;
+ byte[] parametersMarshalled = null;
+
+ for (UUID nodeId : fragmentDesc.nodeIds()) {
+ if (ex != null)
+ qry.onResponse(nodeId, fragment.fragmentId(), ex);
+ else {
+ try {
+ SessionContextImpl sesCtx =
qry.context().unwrap(SessionContextImpl.class);
+
+ QueryStartRequest req = new QueryStartRequest(
+ qry.id(),
+ qry.localQueryId(),
+ qry.context().schemaName(),
+ fragment.serialized(),
+ ectx.topologyVersion(),
+ fragmentDesc,
+ fragmentsPerNode.get(nodeId).intValue(),
+ qry.parameters(),
+ parametersMarshalled,
+ timeout,
+ ectx.getQryTxEntries(),
+ sesCtx == null ? null : sesCtx.attributes(),
+ keepBinary
+ );
+
+ messageService().send(nodeId, req);
+
+ // Avoid marshaling of the same parameters for
other nodes.
+ if (parametersMarshalled == null)
+ parametersMarshalled =
req.parametersMarshalled();
+ }
+ catch (Throwable e) {
+ qry.onResponse(nodeId, fragment.fragmentId(), ex =
e);
+ }
}
}
}
- }
-
- if (perfStatProc.enabled()) {
- perfStatProc.queryProperty(
- GridCacheQueryType.SQL_FIELDS,
- qry.initiatorNodeId(),
- qry.localQueryId(),
- "Query plan",
- plan.textPlan()
- );
- }
- if (ctx.query().runningQueryManager().planHistoryTracker().enabled()) {
- ctx.query().runningQueryManager().planHistoryTracker().addPlan(
- plan.textPlan(),
- qry.sql(),
- qry.context().schemaName(),
- qry.context().isLocal(),
- CalciteQueryEngineConfiguration.ENGINE_NAME
- );
- }
+ if (perfStatProc.enabled()) {
+ perfStatProc.queryProperty(
+ GridCacheQueryType.SQL_FIELDS,
+ qry.initiatorNodeId(),
+ qry.localQueryId(),
+ "Query plan",
+ plan.textPlan()
+ );
+ }
- QueryProperties qryProps = qry.context().unwrap(QueryProperties.class);
+ if
(ctx.query().runningQueryManager().planHistoryTracker().enabled()) {
+ ctx.query().runningQueryManager().planHistoryTracker().addPlan(
+ plan.textPlan(),
+ qry.sql(),
+ qry.context().schemaName(),
+ qry.context().isLocal(),
+ CalciteQueryEngineConfiguration.ENGINE_NAME
+ );
+ }
- Function<Object, Object> fieldConverter = (qryProps == null ||
qryProps.keepBinary()) ? null :
- o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o, false,
true, null);
+ Function<Object, Object> fieldConverter = keepBinary ? null :
+ o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o,
false, true, null);
+
+ HeavyQueriesTracker.ResultSetChecker resultSetChecker =
ctx.query().runningQueryManager()
+ .heavyQueriesTracker().resultSetChecker(qry);
+
+ Function<List<Object>, List<Object>> rowConverter;
+
+ // Fire EVT_CACHE_QUERY_OBJECT_READ on initiator node before
return result to cursor.
+ if (qryProps != null && qryProps.cacheName() != null &&
evtMgr.isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
+ ClusterNode locNode = ctx.discovery().localNode();
+ UUID subjId = SecurityUtils.securitySubjectId(ctx);
+
+ rowConverter = row -> {
+ evtMgr.record(new CacheQueryReadEvent<>(
+ locNode,
+ "SQL fields query result set row read.",
+ EVT_CACHE_QUERY_OBJECT_READ,
+ CacheQueryType.SQL_FIELDS.name(),
+ qryProps.cacheName(),
+ null,
+ qry.sql(),
+ null,
+ null,
+ qry.parameters(),
+ subjId,
+ null,
+ null,
+ null,
+ null,
+ row));
+
+ resultSetChecker.checkOnFetchNext();
+
+ return row;
+ };
+ }
+ else {
+ rowConverter = row -> {
+ resultSetChecker.checkOnFetchNext();
- HeavyQueriesTracker.ResultSetChecker resultSetChecker =
ctx.query().runningQueryManager()
- .heavyQueriesTracker().resultSetChecker(qry);
+ return row;
+ };
+ }
- Function<List<Object>, List<Object>> rowConverter;
+ Runnable onClose = () -> {
+ if (udfQry) // Restore UDF queries limit.
+ udfQryLimit.getAndIncrement();
+
+ if (perfStatProc.enabled()) {
+ perfStatProc.queryRowsProcessed(
+ GridCacheQueryType.SQL_FIELDS,
+ qry.initiatorNodeId(),
+ qry.localQueryId(),
+ "Fetched",
+ resultSetChecker.fetchedSize()
+ );
+ }
- // Fire EVT_CACHE_QUERY_OBJECT_READ on initiator node before return
result to cursor.
- if (qryProps != null && qryProps.cacheName() != null &&
evtMgr.isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
- ClusterNode locNode = ctx.discovery().localNode();
- UUID subjId = SecurityUtils.securitySubjectId(ctx);
+ resultSetChecker.checkOnClose();
+ };
- rowConverter = row -> {
- evtMgr.record(new CacheQueryReadEvent<>(
- locNode,
- "SQL fields query result set row read.",
- EVT_CACHE_QUERY_OBJECT_READ,
- CacheQueryType.SQL_FIELDS.name(),
- qryProps.cacheName(),
- null,
- qry.sql(),
- null,
- null,
- qry.parameters(),
- subjId,
- null,
- null,
- null,
- null,
- row));
+ Iterator<List<?>> it = iteratorsHolder().iterator(new
ConvertingClosableIterator<>(qry.iterator(), ectx,
+ fieldConverter, rowConverter, onClose));
- resultSetChecker.checkOnFetchNext();
+ // Make yet another tracking layer for cursor.getAll(), so
tracking hierarchy will look like:
+ // Row tracker -> Cursor memory tracker -> Query memory tracker ->
Global memory tracker.
+ // It's required, since query memory tracker can be closed
concurrently during getAll() and
+ // tracked data for cursor can be lost without additional tracker.
+ MemoryTracker curMemoryTracker =
QueryMemoryTracker.create(qryMemoryTracker, cfg.getQueryMemoryQuota());
- return row;
- };
+ return new ListFieldsQueryCursor<>(plan, it, ectx,
curMemoryTracker);
}
- else {
- rowConverter = row -> {
- resultSetChecker.checkOnFetchNext();
+ catch (Exception e) {
Review Comment:
is to cacth only checked exceptions is enough here ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]