Repository: ignite Updated Branches: refs/heads/master 33f485aec -> a164296b1
IGNITE-8892 Fixed OOME when scan query is used for a big partition - Fixes #4391. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a164296b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a164296b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a164296b Branch: refs/heads/master Commit: a164296b1a9f4bf98de837953586a52295b82507 Parents: 33f485a Author: Evgeny Stanilovskiy <estanilovs...@gridgain.com> Authored: Mon Jul 23 11:56:21 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Mon Jul 23 11:56:21 2018 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 1 - .../processors/cache/query/CacheQuery.java | 14 +---- .../query/GridCacheDistributedQueryManager.java | 1 - .../cache/query/GridCacheQueryAdapter.java | 20 ------- .../query/GridCacheQueryFutureAdapter.java | 9 +--- .../service/GridServiceProcessor.java | 2 - .../cache/CacheIteratorScanQueryTest.java | 55 +++++++++++++++++++- 7 files changed, 56 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a164296b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 4b8644e..c59e84e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -4001,7 +4001,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final CacheOperationContext opCtx = ctx.operationContextPerCall(); final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(p, null, keepBinary) - .keepAll(false) .executeScanQuery(); return ctx.itHolder().iterator(iter, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/a164296b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java index 0cd01fb..d8eb7ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java @@ -206,15 +206,6 @@ public interface CacheQuery<T> { public CacheQuery<T> timeout(long timeout); /** - * Sets whether or not to keep all query results local. If not - only the current page - * is kept locally. Default value is {@code true}. - * - * @param keepAll Keep results or not. - * @return {@code this} query instance for chaining. - */ - public CacheQuery<T> keepAll(boolean keepAll); - - /** * Sets whether or not to include backup entries into query result. This flag * is {@code false} by default. * @@ -245,10 +236,7 @@ public interface CacheQuery<T> { * Executes the query and returns the query future. Caller may decide to iterate * over the returned future directly in which case the iterator may block until * the next value will become available, or wait for the whole query to finish - * by calling any of the {@code 'get(..)'} methods on the returned future. If - * {@link #keepAll(boolean)} flag is set to {@code false}, then {@code 'get(..)'} - * methods will only return the last page received, otherwise all pages will be - * accumulated and returned to user as a collection. + * by calling any of the {@code 'get(..)'} methods on the returned future. * <p> * Note that if the passed in grid projection is a local node, then query * will be executed locally without distribution to other nodes. http://git-wip-us.apache.org/repos/asf/ignite/blob/a164296b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index b9c24a3..aac1659 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -267,7 +267,6 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage log, req.pageSize(), 0, - false, req.includeBackups(), false, null, http://git-wip-us.apache.org/repos/asf/ignite/blob/a164296b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index b5fdd23..51fdd58 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -110,9 +110,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { private volatile long timeout; /** */ - private volatile boolean keepAll = true; - - /** */ private volatile boolean incBackups; /** */ @@ -205,7 +202,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { * @param log Logger. * @param pageSize Page size. * @param timeout Timeout. - * @param keepAll Keep all flag. * @param incBackups Include backups flag. * @param dedup Enable dedup flag. * @param prj Grid projection. @@ -223,7 +219,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { IgniteLogger log, int pageSize, long timeout, - boolean keepAll, boolean incBackups, boolean dedup, ClusterGroup prj, @@ -240,7 +235,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { this.log = log; this.pageSize = pageSize; this.timeout = timeout; - this.keepAll = keepAll; this.incBackups = incBackups; this.dedup = dedup; this.prj = prj; @@ -352,20 +346,6 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { } /** {@inheritDoc} */ - @Override public CacheQuery<T> keepAll(boolean keepAll) { - this.keepAll = keepAll; - - return this; - } - - /** - * @return Keep all flag. - */ - public boolean keepAll() { - return keepAll; - } - - /** {@inheritDoc} */ @Override public CacheQuery<T> includeBackups(boolean incBackups) { this.incBackups = incBackups; http://git-wip-us.apache.org/repos/asf/ignite/blob/a164296b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java index c418ca2..9a5dd26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java @@ -76,9 +76,6 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda private final Queue<Collection<R>> queue = new LinkedList<>(); /** */ - private final Collection<Object> allCol = new LinkedList<>(); - - /** */ private final AtomicInteger cnt = new AtomicInteger(); /** */ @@ -403,11 +400,8 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda synchronized (this) { enqueue(data); - if (qry.query().keepAll()) - allCol.addAll(maskNulls((Collection<Object>)data)); - if (onPage(nodeId, finished)) { - onDone((Collection<R>)(qry.query().keepAll() ? unmaskNulls(allCol) : data)); + onDone(/* data */); clear(); } @@ -580,7 +574,6 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda public void printMemoryStats() { X.println(">>> Query future memory statistics."); X.println(">>> queueSize: " + queue.size()); - X.println(">>> allCollSize: " + allCol.size()); X.println(">>> keysSize: " + keys.size()); X.println(">>> cnt: " + cnt); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a164296b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index 04c50ac..0f29791 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -1527,8 +1527,6 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, null, false); - qry.keepAll(false); - DiscoveryDataClusterState clusterState = ctx.state().clusterState(); if ((clusterState.hasBaselineTopology() http://git-wip-us.apache.org/repos/asf/ignite/blob/a164296b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIteratorScanQueryTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIteratorScanQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIteratorScanQueryTest.java index 951cddf..c6cd87b 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIteratorScanQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIteratorScanQueryTest.java @@ -19,12 +19,19 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import javax.cache.Cache; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; @@ -44,6 +51,11 @@ public class CacheIteratorScanQueryTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); @@ -94,6 +106,47 @@ public class CacheIteratorScanQueryTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testQueryGetAllClientSide() throws Exception { + Ignite server = startGrid(0); + + IgniteCache<Integer, Integer> cache = server.getOrCreateCache(DEFAULT_CACHE_NAME); + + client = true; + + Ignite client = startGrid(1); + + IgniteCache<Integer, Integer> cliCache = client.cache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < 100_000; i++) + cache.put(i, i); + + ScanQuery<Integer, Integer> qry = new ScanQuery<>(); + + qry.setPageSize(100); + + try (QueryCursor<Cache.Entry<Integer, Integer>> cur = cliCache.query(qry)) { + List<Cache.Entry<Integer, Integer>> res = cur.getAll(); + + assertEquals(100_000, res.size()); + + Collections.sort(res, (e1, e2) -> { + return e1.getKey().compareTo(e2.getKey()); + }); + + int exp = 0; + + for (Cache.Entry<Integer, Integer> e : res) { + assertEquals(exp, e.getKey().intValue()); + assertEquals(exp, e.getValue().intValue()); + + exp++; + } + } + } + + /** * Return always false. */ public static class AlwaysFalseCacheFilter implements IgnitePredicate<ClusterNode> { @@ -102,4 +155,4 @@ public class CacheIteratorScanQueryTest extends GridCommonAbstractTest { return false; } } -} \ No newline at end of file +}