This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f56ffcef59b IGNITE-23554 ScanQuery code cleanup (#11645)
f56ffcef59b is described below
commit f56ffcef59bb60c24185b4cde3624caa9964da2a
Author: Nikolay <[email protected]>
AuthorDate: Thu Nov 7 11:17:58 2024 +0300
IGNITE-23554 ScanQuery code cleanup (#11645)
---
.../query/calcite/CalciteQueryProcessor.java | 13 +-
.../managers/systemview/ScanQuerySystemView.java | 8 +-
.../processors/cache/query/CacheQuery.java | 1 -
.../cache/query/GridCacheQueryManager.java | 470 +--------------------
.../cache/query/GridCacheQueryRequest.java | 10 +-
.../processors/cache/query/ScanQueryIterator.java | 460 ++++++++++++++++++++
.../cache/transactions/IgniteTxManager.java | 12 +
.../odbc/jdbc/JdbcConnectionContext.java | 3 +-
.../platform/client/ClientConnectionContext.java | 3 +-
.../processors/query/GridQueryProcessor.java | 3 +-
.../apache/ignite/internal/util/IgniteUtils.java | 5 +
.../ignite/spi/systemview/view/ScanQueryView.java | 2 +-
12 files changed, 506 insertions(+), 484 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index a21ba720ced..8dbaf85ba70 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -58,8 +58,8 @@ import
org.apache.ignite.configuration.QueryEngineConfiguration;
import org.apache.ignite.events.SqlQueryExecutionEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
@@ -119,10 +119,10 @@ import
org.apache.ignite.internal.processors.query.calcite.util.LifecycleAware;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
import org.apache.ignite.internal.processors.security.SecurityUtils;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.getLong;
-import static
org.apache.ignite.configuration.TransactionConfiguration.TX_AWARE_QUERIES_SUPPORTED_MODES;
import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
/** */
@@ -605,7 +605,7 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
/** */
private void ensureTransactionModeSupported(@Nullable QueryContext qryCtx)
{
- if
(!ctx.config().getTransactionConfiguration().isTxAwareQueriesEnabled())
+ if (!U.isTxAwareQueriesEnabled(ctx))
return;
GridCacheVersion ver = queryTransactionVersion(qryCtx);
@@ -613,12 +613,7 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
if (ver == null)
return;
- final GridNearTxLocal userTx = ctx.cache().context().tm().tx(ver);
-
- if (TX_AWARE_QUERIES_SUPPORTED_MODES.contains(userTx.isolation()))
- return;
-
- throw new IllegalStateException("Transaction isolation mode not
supported for SQL queries: " + userTx.isolation());
+
IgniteTxManager.ensureTransactionModeSupported(ctx.cache().context().tm().tx(ver).isolation());
}
/** */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/ScanQuerySystemView.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/ScanQuerySystemView.java
index 8583d358be2..5beb99372ee 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/ScanQuerySystemView.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/systemview/ScanQuerySystemView.java
@@ -28,7 +28,7 @@ import org.apache.ignite.IgniteException;
import
org.apache.ignite.internal.managers.systemview.walker.ScanQueryViewWalker;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
-import
org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager.ScanQueryIterator;
+import org.apache.ignite.internal.processors.cache.query.ScanQueryIterator;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.IgniteSpiCloseableIterator;
@@ -83,7 +83,7 @@ public class ScanQuerySystemView<K, V> extends
AbstractSystemView<ScanQueryView>
return new QueryDataIterator();
}
- /** Class to iterate through all {@link
GridCacheQueryManager.ScanQueryIterator}. */
+ /** Class to iterate through all {@link ScanQueryIterator}. */
private class QueryDataIterator implements Iterator<ScanQueryView> {
/** Cache contexts iterator. */
private final Iterator<GridCacheContext<K, V>> cctxsIter;
@@ -95,7 +95,7 @@ public class ScanQuerySystemView<K, V> extends
AbstractSystemView<ScanQueryView>
private Iterator<Map.Entry<UUID, GridCacheQueryManager<K,
V>.RequestFutureMap>> nodeQryIter;
/** Local query iterator. */
- private Iterator<GridCacheQueryManager.ScanQueryIterator> localQryIter;
+ private Iterator<ScanQueryIterator> localQryIter;
/** Current node id. */
private UUID nodeId;
@@ -173,7 +173,7 @@ public class ScanQuerySystemView<K, V> extends
AbstractSystemView<ScanQueryView>
}
/**
- * @return {@code True} if next {@link
GridCacheQueryManager.ScanQueryIterator} found.
+ * @return {@code True} if next {@link ScanQueryIterator} found.
*/
private boolean nextScanIter() {
try {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
index 2d9474993d0..192c8f1b178 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
@@ -785,7 +785,6 @@ public class CacheQuery<T> {
top.readLock();
try {
-
Collection<ClusterNode> affNodes = nodes(cctx, null, null);
List<ClusterNode> nodes = new ArrayList<>(affNodes);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 9d2e65ac5b6..0d386cd2c2c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -33,7 +33,6 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
@@ -46,7 +45,6 @@ import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.QueryIndexType;
@@ -76,10 +74,7 @@ import
org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
-import
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
-import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
@@ -89,13 +84,11 @@ import
org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import
org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
import
org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicate;
import org.apache.ignite.internal.processors.datastructures.SetItemKey;
-import
org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.security.SecurityUtils;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.GridBoundedPriorityQueue;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
@@ -795,16 +788,13 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
private GridCloseableIterator scanIterator(final CacheQuery<?> qry,
IgniteClosure transformer,
boolean locNode)
throws IgniteCheckedException {
- final InternalScanFilter<K, V> intFilter =
internalFilter(qry.scanFilter());
-
try {
Integer part = qry.partition();
if (part != null && (part < 0 || part >=
cctx.affinity().partitions()))
return new GridEmptyCloseableIterator() {
@Override public void close() throws
IgniteCheckedException {
- if (intFilter != null)
- intFilter.close();
+ ScanQueryIterator.closeFilter(qry.scanFilter());
super.close();
}
@@ -856,8 +846,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
}
ScanQueryIterator iter = new ScanQueryIterator(it, qry, topVer,
locPart,
- intFilter,
- prepareTransformer(transformer),
+ transformer,
locNode, locNode ? locIters : null, cctx, log);
if (locNode) {
@@ -869,35 +858,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
return iter;
}
catch (IgniteCheckedException | RuntimeException e) {
- if (intFilter != null)
- intFilter.close();
-
- throw e;
- }
- }
-
- /** */
- private @Nullable IgniteClosure<?, ?> prepareTransformer(IgniteClosure<?,
?> transformer) throws IgniteCheckedException {
- return SecurityUtils.sandboxedProxy(cctx.kernalContext(),
IgniteClosure.class, injectResources(transformer));
- }
-
- /** */
- private @Nullable InternalScanFilter<K, V>
internalFilter(IgniteBiPredicate<K, V> keyValFilter) throws
IgniteCheckedException {
- if (keyValFilter == null)
- return null;
-
- try {
- if (keyValFilter instanceof PlatformCacheEntryFilter)
- ((PlatformCacheEntryFilter)keyValFilter).cacheContext(cctx);
- else
- injectResources(keyValFilter);
-
- keyValFilter = SecurityUtils.sandboxedProxy(cctx.kernalContext(),
IgniteBiPredicate.class, keyValFilter);
-
- return new InternalScanFilter<>(keyValFilter);
- }
- catch (IgniteCheckedException | RuntimeException e) {
- InternalScanFilter.close(keyValFilter);
+ ScanQueryIterator.closeFilter(qry.scanFilter());
throw e;
}
@@ -907,7 +868,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
* @param o Object to inject resources to.
* @throws IgniteCheckedException If failure occurred while injecting
resources.
*/
- private <R> R injectResources(@Nullable R o) throws IgniteCheckedException
{
+ static <R> R injectResources(@Nullable R o, GridCacheContext<?, ?> cctx)
throws IgniteCheckedException {
if (o == null)
return null;
@@ -952,7 +913,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
try {
// Preparing query closures.
- final IgniteReducer<Object, Object> rdc =
injectResources((IgniteReducer<Object, Object>)qryInfo.reducer());
+ final IgniteReducer<Object, Object> rdc =
injectResources((IgniteReducer<Object, Object>)qryInfo.reducer(), cctx);
CacheQuery<?> qry = qryInfo.query();
@@ -1149,7 +1110,8 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
CacheQuery<?> qry = qryInfo.query();
try {
- IgniteReducer<Cache.Entry<K, V>, Object> rdc =
injectResources((IgniteReducer<Cache.Entry<K, V>, Object>)qryInfo.reducer());
+ IgniteReducer<Cache.Entry<K, V>, Object> rdc =
+ injectResources((IgniteReducer<Cache.Entry<K, V>,
Object>)qryInfo.reducer(), cctx);
int pageSize = qry.pageSize();
@@ -1398,9 +1360,6 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
final String namex = cctx.name();
- final InternalScanFilter<K, V> intFilter = qry.scanFilter() != null ?
- new InternalScanFilter<>(qry.scanFilter()) : null;
-
try {
assert qry.type() == SCAN;
@@ -1419,7 +1378,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
namex,
null,
null,
- intFilter != null ? intFilter.scanFilter() : null,
+ qry.scanFilter(),
null,
null,
securitySubjectId(cctx),
@@ -1433,8 +1392,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
return it;
}
catch (Exception e) {
- if (intFilter != null)
- intFilter.close();
+ ScanQueryIterator.closeFilter(qry.scanFilter());
if (updateStatistics)
cctx.queries().collectMetrics(GridCacheQueryType.SCAN, namex,
startTime,
@@ -3042,414 +3000,4 @@ public abstract class GridCacheQueryManager<K, V>
extends GridCacheManagerAdapte
}
}
- /** */
- public static final class ScanQueryIterator<K, V> extends
GridCloseableIteratorAdapter<Object> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final GridDhtCacheAdapter dht;
-
- /** */
- private final GridDhtLocalPartition locPart;
-
- /** */
- private final InternalScanFilter<K, V> intScanFilter;
-
- /** */
- private final boolean statsEnabled;
-
- /** */
- private final GridIterator<CacheDataRow> it;
-
- /** */
- private final GridCacheAdapter cache;
-
- /** */
- private final AffinityTopologyVersion topVer;
-
- /** */
- private final boolean keepBinary;
-
- /** */
- private final boolean readEvt;
-
- /** */
- private final String cacheName;
-
- /** */
- private final UUID subjId;
-
- /** */
- private final String taskName;
-
- /** */
- private final IgniteClosure transform;
-
- /** */
- private final CacheObjectContext objCtx;
-
- /** */
- private final GridCacheContext cctx;
-
- /** */
- private final IgniteLogger log;
-
- /** */
- private Object next;
-
- /** */
- private boolean needAdvance;
-
- /** */
- private IgniteCacheExpiryPolicy expiryPlc;
-
- /** */
- private final boolean locNode;
-
- /** */
- private final boolean incBackups;
-
- /** */
- private final long startTime;
-
- /** */
- private final int pageSize;
-
- /** */
- @Nullable private final GridConcurrentHashSet<ScanQueryIterator>
locIters;
-
- /**
- * @param it Iterator.
- * @param qry Query.
- * @param topVer Topology version.
- * @param locPart Local partition.
- * @param intScanFilter Internal scan filter.
- * @param transformer Transformer.
- * @param locNode Local node flag.
- * @param locIters Local iterators set.
- * @param cctx Cache context.
- * @param log Logger.
- */
- ScanQueryIterator(
- GridIterator<CacheDataRow> it,
- CacheQuery qry,
- AffinityTopologyVersion topVer,
- GridDhtLocalPartition locPart,
- InternalScanFilter<K, V> intScanFilter,
- IgniteClosure transformer,
- boolean locNode,
- @Nullable GridConcurrentHashSet<ScanQueryIterator> locIters,
- GridCacheContext cctx,
- IgniteLogger log) {
- assert !locNode || locIters != null : "Local iterators can't be
null for local query.";
-
- this.it = it;
- this.topVer = topVer;
- this.locPart = locPart;
- this.intScanFilter = intScanFilter;
- this.cctx = cctx;
-
- this.log = log;
- this.locNode = locNode;
- this.locIters = locIters;
-
- incBackups = qry.includeBackups();
-
- statsEnabled = cctx.statisticsEnabled();
-
- readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)
&&
- cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ);
-
- taskName = readEvt ?
cctx.kernalContext().task().resolveTaskName(qry.taskHash()) : null;
-
- subjId = securitySubjectId(cctx);
-
- // keep binary for remote scans if possible
- keepBinary = (!locNode && intScanFilter == null && transformer ==
null && !readEvt) || qry.keepBinary();
- transform = transformer;
- dht = cctx.isNear() ? cctx.near().dht() : cctx.dht();
- cache = dht != null ? dht : cctx.cache();
- objCtx = cctx.cacheObjectContext();
- cacheName = cctx.name();
-
- needAdvance = true;
- expiryPlc = this.cctx.cache().expiryPolicy(null);
-
- startTime = U.currentTimeMillis();
- pageSize = qry.pageSize();
- }
-
- /** {@inheritDoc} */
- @Override protected Object onNext() {
- if (needAdvance)
- advance();
- else
- needAdvance = true;
-
- if (next == null)
- throw new NoSuchElementException();
-
- return next;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean onHasNext() {
- if (needAdvance) {
- advance();
-
- needAdvance = false;
- }
-
- return next != null;
- }
-
- /** {@inheritDoc} */
- @Override protected void onClose() {
- if (expiryPlc != null && dht != null) {
- dht.sendTtlUpdateRequest(expiryPlc);
-
- expiryPlc = null;
- }
-
- if (locPart != null)
- locPart.release();
-
- if (intScanFilter != null)
- intScanFilter.close();
-
- if (locIters != null)
- locIters.remove(this);
- }
-
- /**
- * Moves the iterator to the next cache entry.
- */
- private void advance() {
- long start = statsEnabled ? System.nanoTime() : 0L;
-
- Object next0 = null;
-
- while (it.hasNext()) {
- CacheDataRow row = it.next();
-
- KeyCacheObject key = row.key();
- CacheObject val;
-
- if (expiryPlc != null) {
- try {
- CacheDataRow tmp = row;
-
- while (true) {
- try {
- GridCacheEntryEx entry = cache.entryEx(key);
-
- entry.unswap(tmp);
-
- val = entry.peek(true, true, topVer,
expiryPlc);
-
- entry.touch();
-
- break;
- }
- catch (GridCacheEntryRemovedException ignore) {
- tmp = null;
- }
- }
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to peek value: " + e);
-
- val = null;
- }
-
- if (dht != null && expiryPlc.readyToFlush(100))
- dht.sendTtlUpdateRequest(expiryPlc);
- }
- else
- val = row.value();
-
- // Filter backups for SCAN queries, if it isn't partition scan.
- // Other types are filtered in indexing manager.
- if (!cctx.isReplicated() && /*qry.partition()*/this.locPart ==
null && !incBackups &&
- !cctx.affinity().primaryByKey(cctx.localNode(), key,
topVer)) {
- if (log.isDebugEnabled())
- log.debug("Ignoring backup element [row=" + row +
- ", cacheMode=" + cctx.config().getCacheMode() + ",
incBackups=" + incBackups +
- ", primary=" +
cctx.affinity().primaryByKey(cctx.localNode(), key, topVer) + ']');
-
- continue;
- }
-
- if (log.isDebugEnabled()) {
- ClusterNode primaryNode = cctx.affinity().primaryByKey(key,
- cctx.affinity().affinityTopologyVersion());
-
- log.debug(S.toString("Record",
- "key", key, true,
- "val", val, true,
- "incBackups", incBackups, false,
- "priNode", primaryNode != null ?
U.id8(primaryNode.id()) : null, false,
- "node", U.id8(cctx.localNode().id()), false));
- }
-
- if (val != null) {
- K key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx,
key, keepBinary, false);
- V val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx,
val, keepBinary, false);
-
- if (statsEnabled) {
- CacheMetricsImpl metrics = cctx.cache().metrics0();
-
- metrics.onRead(true);
-
- metrics.addGetTimeNanos(System.nanoTime() - start);
- }
-
- if (intScanFilter == null || intScanFilter.apply(key0,
val0)) {
- if (readEvt) {
- cctx.gridEvents().record(new CacheQueryReadEvent<>(
- cctx.localNode(),
- "Scan query entry read.",
- EVT_CACHE_QUERY_OBJECT_READ,
- CacheQueryType.SCAN.name(),
- cacheName,
- null,
- null,
- intScanFilter != null ?
intScanFilter.scanFilter() : null,
- null,
- null,
- subjId,
- taskName,
- key0,
- val0,
- null,
- null));
- }
-
- if (transform != null) {
- try {
- next0 = transform.apply(new
CacheQueryEntry<>(key0, val0));
- }
- catch (Throwable e) {
- throw new IgniteException(e);
- }
- }
- else
- next0 = !locNode ? new T2<>(key0, val0) :
- new CacheQueryEntry<>(key0, val0);
-
- break;
- }
- }
- }
-
- if ((this.next = next0) == null && expiryPlc != null && dht !=
null) {
- dht.sendTtlUpdateRequest(expiryPlc);
-
- expiryPlc = null;
- }
- }
-
- /** */
- @Nullable public IgniteBiPredicate<K, V> filter() {
- return intScanFilter == null ? null : intScanFilter.scanFilter;
- }
-
- /** */
- public AffinityTopologyVersion topVer() {
- return topVer;
- }
-
- /** */
- public GridDhtLocalPartition localPartition() {
- return locPart;
- }
-
- /** */
- public IgniteClosure transformer() {
- return transform;
- }
-
- /** */
- public long startTime() {
- return startTime;
- }
-
- /** */
- public boolean local() {
- return locNode;
- }
-
- /** */
- public boolean keepBinary() {
- return keepBinary;
- }
-
- /** */
- public UUID subjectId() {
- return subjId;
- }
-
- /** */
- public String taskName() {
- return taskName;
- }
-
- /** */
- public GridCacheContext cacheContext() {
- return cctx;
- }
-
- /** */
- public int pageSize() {
- return pageSize;
- }
- }
-
- /**
- * Wrap scan filter in order to catch unhandled errors.
- */
- private static class InternalScanFilter<K, V> implements
IgniteBiPredicate<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final IgniteBiPredicate<K, V> scanFilter;
-
- /**
- * @param scanFilter User scan filter.
- */
- InternalScanFilter(IgniteBiPredicate<K, V> scanFilter) {
- this.scanFilter = scanFilter;
- }
-
- /** {@inheritDoc} */
- @Override public boolean apply(K k, V v) {
- try {
- return scanFilter == null || scanFilter.apply(k, v);
- }
- catch (Throwable e) {
- throw new IgniteException(e);
- }
- }
-
- /** */
- void close() {
- close(scanFilter);
- }
-
- /** */
- static void close(IgniteBiPredicate<?, ?> scanFilter) {
- if (scanFilter instanceof PlatformCacheEntryFilter)
- ((PlatformCacheEntryFilter)scanFilter).onClose();
- }
-
- /**
- * @return Wrapped scan filter.
- */
- IgniteBiPredicate<K, V> scanFilter() {
- return scanFilter;
- }
- }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 3015d335a06..3129a991e81 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -452,20 +452,22 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
Marshaller mrsh = ctx.marshaller();
+ ClassLoader clsLdr = U.resolveClassLoader(ldr, ctx.gridConfig());
+
if (keyValFilterBytes != null && keyValFilter == null)
- keyValFilter = U.unmarshal(mrsh, keyValFilterBytes,
U.resolveClassLoader(ldr, ctx.gridConfig()));
+ keyValFilter = U.unmarshal(mrsh, keyValFilterBytes, clsLdr);
if (rdcBytes != null && rdc == null)
rdc = U.unmarshal(mrsh, rdcBytes, ldr);
if (transBytes != null && trans == null)
- trans = U.unmarshal(mrsh, transBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
+ trans = U.unmarshal(mrsh, transBytes, clsLdr);
if (argsBytes != null && args == null)
- args = U.unmarshal(mrsh, argsBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
+ args = U.unmarshal(mrsh, argsBytes, clsLdr);
if (idxQryDescBytes != null && idxQryDesc == null)
- idxQryDesc = U.unmarshal(mrsh, idxQryDescBytes,
U.resolveClassLoader(ldr, ctx.gridConfig()));
+ idxQryDesc = U.unmarshal(mrsh, idxQryDescBytes, clsLdr);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java
new file mode 100644
index 00000000000..777b19f3eab
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.CacheQueryReadEvent;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import
org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
+import org.apache.ignite.internal.processors.security.SecurityUtils;
+import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.lang.GridIterator;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
+import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager.injectResources;
+import static
org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId;
+
+/** */
+public final class ScanQueryIterator<K, V, R> extends
GridCloseableIteratorAdapter<R> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final GridDhtCacheAdapter<K, V> dht;
+
+ /** */
+ private final GridDhtLocalPartition locPart;
+
+ /** */
+ private final IgniteBiPredicate<K, V> filter;
+
+ /** */
+ private final Runnable closeFilterClo;
+
+ /** */
+ private final boolean statsEnabled;
+
+ /** */
+ private final GridIterator<CacheDataRow> it;
+
+ /** */
+ private final GridCacheAdapter<K, V> cache;
+
+ /** */
+ private final AffinityTopologyVersion topVer;
+
+ /** */
+ private final boolean keepBinary;
+
+ /** */
+ private final boolean readEvt;
+
+ /** */
+ private final String cacheName;
+
+ /** */
+ private final UUID subjId;
+
+ /** */
+ private final String taskName;
+
+ /** */
+ private final IgniteClosure<Cache.Entry<K, V>, R> transform;
+
+ /** */
+ private final CacheObjectContext objCtx;
+
+ /** */
+ private final GridCacheContext<K, V> cctx;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private R next;
+
+ /** */
+ private boolean needAdvance;
+
+ /** */
+ private IgniteCacheExpiryPolicy expiryPlc;
+
+ /** */
+ private final boolean locNode;
+
+ /** */
+ private final boolean incBackups;
+
+ /** */
+ private final long startTime;
+
+ /** */
+ private final int pageSize;
+
+ /** */
+ @Nullable private final GridConcurrentHashSet<ScanQueryIterator<K, V, R>>
locIters;
+
+ /**
+ * @param it Iterator.
+ * @param qry Query.
+ * @param topVer Topology version.
+ * @param locPart Local partition.
+ * @param transformer Transformer.
+ * @param locNode Local node flag.
+ * @param locIters Local iterators set.
+ * @param cctx Cache context.
+ * @param log Logger.
+ */
+ ScanQueryIterator(
+ GridIterator<CacheDataRow> it,
+ CacheQuery<R> qry,
+ AffinityTopologyVersion topVer,
+ GridDhtLocalPartition locPart,
+ IgniteClosure<Cache.Entry<K, V>, R> transformer,
+ boolean locNode,
+ @Nullable GridConcurrentHashSet<ScanQueryIterator<K, V, R>> locIters,
+ GridCacheContext<K, V> cctx,
+ IgniteLogger log) throws IgniteCheckedException {
+ assert !locNode || locIters != null : "Local iterators can't be null
for local query.";
+
+ this.it = it;
+ this.topVer = topVer;
+ this.locPart = locPart;
+ this.cctx = cctx;
+
+ this.log = log;
+ this.locNode = locNode;
+ this.locIters = locIters;
+
+ incBackups = qry.includeBackups();
+
+ statsEnabled = cctx.statisticsEnabled();
+
+ readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ) &&
+ cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ);
+
+ taskName = readEvt ?
cctx.kernalContext().task().resolveTaskName(qry.taskHash()) : null;
+
+ subjId = securitySubjectId(cctx);
+
+ dht = cctx.isNear() ? cctx.near().dht() : cctx.dht();
+ cache = dht != null ? dht : cctx.cache();
+ objCtx = cctx.cacheObjectContext();
+ cacheName = cctx.name();
+
+ needAdvance = true;
+ expiryPlc = this.cctx.cache().expiryPolicy(null);
+
+ startTime = U.currentTimeMillis();
+ pageSize = qry.pageSize();
+ transform = SecurityUtils.sandboxedProxy(cctx.kernalContext(),
IgniteClosure.class, injectResources(transformer, cctx));
+ closeFilterClo = qry.scanFilter() instanceof PlatformCacheEntryFilter
+ ? () -> closeFilter(qry.scanFilter())
+ : null;
+ filter = prepareFilter(qry.scanFilter());
+ // keep binary for remote scans if possible
+ keepBinary = (!locNode && filter == null && transformer == null &&
!readEvt) || qry.keepBinary();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected R onNext() {
+ if (needAdvance)
+ advance();
+ else
+ needAdvance = true;
+
+ if (next == null)
+ throw new NoSuchElementException();
+
+ return next;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean onHasNext() {
+ if (needAdvance) {
+ advance();
+
+ needAdvance = false;
+ }
+
+ return next != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onClose() {
+ if (expiryPlc != null && dht != null) {
+ dht.sendTtlUpdateRequest(expiryPlc);
+
+ expiryPlc = null;
+ }
+
+ if (locPart != null)
+ locPart.release();
+
+ if (closeFilterClo != null)
+ closeFilterClo.run();
+
+ if (locIters != null)
+ locIters.remove(this);
+ }
+
+ /**
+ * Moves the iterator to the next cache entry.
+ */
+ private void advance() {
+ long start = statsEnabled ? System.nanoTime() : 0L;
+
+ R next0 = null;
+
+ while (it.hasNext()) {
+ CacheDataRow row = it.next();
+
+ KeyCacheObject key = row.key();
+ CacheObject val;
+
+ if (expiryPlc != null) {
+ try {
+ CacheDataRow tmp = row;
+
+ while (true) {
+ try {
+ GridCacheEntryEx entry = cache.entryEx(key);
+
+ entry.unswap(tmp);
+
+ val = entry.peek(true, true, topVer, expiryPlc);
+
+ entry.touch();
+
+ break;
+ }
+ catch (GridCacheEntryRemovedException ignore) {
+ tmp = null;
+ }
+ }
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to peek value: " + e);
+
+ val = null;
+ }
+
+ if (dht != null && expiryPlc.readyToFlush(100))
+ dht.sendTtlUpdateRequest(expiryPlc);
+ }
+ else
+ val = row.value();
+
+ // Filter backups for SCAN queries, if it isn't partition scan.
+ // Other types are filtered in indexing manager.
+ if (!cctx.isReplicated() && /*qry.partition()*/this.locPart ==
null && !incBackups &&
+ !cctx.affinity().primaryByKey(cctx.localNode(), key, topVer)) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring backup element [row=" + row +
+ ", cacheMode=" + cctx.config().getCacheMode() + ",
incBackups=" + incBackups +
+ ", primary=" +
cctx.affinity().primaryByKey(cctx.localNode(), key, topVer) + ']');
+
+ continue;
+ }
+
+ if (log.isDebugEnabled()) {
+ ClusterNode primaryNode = cctx.affinity().primaryByKey(key,
+ cctx.affinity().affinityTopologyVersion());
+
+ log.debug(S.toString("Record",
+ "key", key, true,
+ "val", val, true,
+ "incBackups", incBackups, false,
+ "priNode", primaryNode != null ? U.id8(primaryNode.id()) :
null, false,
+ "node", U.id8(cctx.localNode().id()), false));
+ }
+
+ if (val != null) {
+ K key0 = (K)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, key,
keepBinary, false);
+ V val0 = (V)CacheObjectUtils.unwrapBinaryIfNeeded(objCtx, val,
keepBinary, false);
+
+ if (statsEnabled) {
+ CacheMetricsImpl metrics = cctx.cache().metrics0();
+
+ metrics.onRead(true);
+
+ metrics.addGetTimeNanos(System.nanoTime() - start);
+ }
+
+ boolean passFilter;
+
+ try {
+ passFilter = filter == null || filter.apply(key0, val0);
+ }
+ catch (Throwable e) {
+ throw new IgniteException(e);
+ }
+
+ if (passFilter) {
+ if (readEvt) {
+ cctx.gridEvents().record(new CacheQueryReadEvent<>(
+ cctx.localNode(),
+ "Scan query entry read.",
+ EVT_CACHE_QUERY_OBJECT_READ,
+ CacheQueryType.SCAN.name(),
+ cacheName,
+ null,
+ null,
+ filter,
+ null,
+ null,
+ subjId,
+ taskName,
+ key0,
+ val0,
+ null,
+ null));
+ }
+
+ if (transform != null) {
+ try {
+ next0 = transform.apply(new
CacheQueryEntry<>(key0, val0));
+ }
+ catch (Throwable e) {
+ throw new IgniteException(e);
+ }
+ }
+ else
+ next0 = (R)(!locNode ? new T2<>(key0, val0) :
+ new CacheQueryEntry<>(key0, val0));
+
+ break;
+ }
+ }
+ }
+
+ if ((this.next = next0) == null && expiryPlc != null && dht != null) {
+ dht.sendTtlUpdateRequest(expiryPlc);
+
+ expiryPlc = null;
+ }
+ }
+
+ /** */
+ @Nullable public IgniteBiPredicate<K, V> filter() {
+ return filter;
+ }
+
+ /** */
+ public AffinityTopologyVersion topVer() {
+ return topVer;
+ }
+
+ /** */
+ public GridDhtLocalPartition localPartition() {
+ return locPart;
+ }
+
+ /** */
+ public IgniteClosure<Cache.Entry<K, V>, R> transformer() {
+ return transform;
+ }
+
+ /** */
+ public long startTime() {
+ return startTime;
+ }
+
+ /** */
+ public boolean local() {
+ return locNode;
+ }
+
+ /** */
+ public boolean keepBinary() {
+ return keepBinary;
+ }
+
+ /** */
+ public UUID subjectId() {
+ return subjId;
+ }
+
+ /** */
+ public String taskName() {
+ return taskName;
+ }
+
+ /** */
+ public GridCacheContext<K, V> cacheContext() {
+ return cctx;
+ }
+
+ /** */
+ public int pageSize() {
+ return pageSize;
+ }
+
+ /** */
+ private @Nullable IgniteBiPredicate<K, V>
prepareFilter(IgniteBiPredicate<K, V> filter) throws IgniteCheckedException {
+ if (filter == null)
+ return null;
+
+ try {
+ if (filter instanceof PlatformCacheEntryFilter)
+ ((PlatformCacheEntryFilter)filter).cacheContext(cctx);
+ else
+ injectResources(filter, cctx);
+
+ return SecurityUtils.sandboxedProxy(cctx.kernalContext(),
IgniteBiPredicate.class, filter);
+ }
+ catch (IgniteCheckedException | RuntimeException e) {
+ closeFilter(filter);
+
+ throw e;
+ }
+ }
+
+ /** */
+ public static void closeFilter(IgniteBiPredicate<?, ?> filter) {
+ if (filter instanceof PlatformCacheEntryFilter)
+ ((PlatformCacheEntryFilter)filter).onClose();
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index e0f7d1c7389..3920585f00b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -130,6 +130,7 @@ import static
org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DU
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT;
import static
org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS;
+import static
org.apache.ignite.configuration.TransactionConfiguration.TX_AWARE_QUERIES_SUPPORTED_MODES;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -3594,4 +3595,15 @@ public class IgniteTxManager extends
GridCacheSharedManagerAdapter {
public void clearUncommitedStates() {
uncommitedTx = Collections.emptySet();
}
+
+ /**
+ * Checks if transaction mode supported for transaction aware queries.
+ * @param isolation Transaction isolation to check.
+ */
+ public static void ensureTransactionModeSupported(TransactionIsolation
isolation) {
+ if (TX_AWARE_QUERIES_SUPPORTED_MODES.contains(isolation))
+ return;
+
+ throw new IllegalStateException("Transaction isolation mode not
supported for SQL queries: " + isolation);
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index 935e2091510..a9eb55043cc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -40,6 +40,7 @@ import
org.apache.ignite.internal.processors.query.QueryEngineConfigurationEx;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
@@ -206,7 +207,7 @@ public class JdbcConnectionContext extends
ClientListenerAbstractConnectionConte
features = JdbcThinFeature.enumSet(cliFeatures);
- if
(!ctx.config().getTransactionConfiguration().isTxAwareQueriesEnabled())
+ if (!U.isTxAwareQueriesEnabled(ctx))
features.remove(JdbcThinFeature.TX_AWARE_QUERIES);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
index 5314e1cfdf2..c4d4dc59861 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientConnectionContext.java
@@ -37,6 +37,7 @@ import
org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler;
import
org.apache.ignite.internal.processors.platform.client.tx.ClientTxContext;
import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.internal.processors.odbc.ClientListenerNioListener.THIN_CLIENT;
@@ -195,7 +196,7 @@ public class ClientConnectionContext extends
ClientListenerAbstractConnectionCon
features = ClientBitmaskFeature.enumSet(cliFeatures);
- if
(!ctx.config().getTransactionConfiguration().isTxAwareQueriesEnabled())
+ if (!U.isTxAwareQueriesEnabled(ctx))
features.remove(ClientBitmaskFeature.TX_AWARE_QUERIES);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index fed8ec75d92..9847d95169a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -349,8 +349,7 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
initQueryEngines();
idxBuildStatusStorage = new IndexBuildStatusStorage(ctx);
-
- txAwareQueriesEnabled =
ctx.config().getTransactionConfiguration().isTxAwareQueriesEnabled();
+ txAwareQueriesEnabled = U.isTxAwareQueriesEnabled(ctx);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 82470199935..28a43fee3db 100755
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -12536,4 +12536,9 @@ public abstract class IgniteUtils {
return sb.toString();
}
+
+ /** */
+ public static boolean isTxAwareQueriesEnabled(GridKernalContext kctx) {
+ return
kctx.config().getTransactionConfiguration().isTxAwareQueriesEnabled();
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/ScanQueryView.java
b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/ScanQueryView.java
index 571acd93039..35adca4dae8 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/ScanQueryView.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/ScanQueryView.java
@@ -20,7 +20,7 @@ package org.apache.ignite.spi.systemview.view;
import java.util.UUID;
import org.apache.ignite.internal.managers.systemview.walker.Order;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import
org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager.ScanQueryIterator;
+import org.apache.ignite.internal.processors.cache.query.ScanQueryIterator;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;