This is an automated email from the ASF dual-hosted git repository. ilyak pushed a commit to branch ignite-2.9 in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-2.9 by this push: new 8faaae9 IGNITE-13212 Fixed p2p deployment of scan queries transformer class - Fixes #8000. 8faaae9 is described below commit 8faaae93199555ee170ffecc356ebde3b5dc93f8 Author: Sergey Chugunov <schugu...@gridgain.com> AuthorDate: Tue Jul 7 16:07:07 2020 +0300 IGNITE-13212 Fixed p2p deployment of scan queries transformer class - Fixes #8000. Signed-off-by: Ilya Kasnacheev <ilya.kasnach...@gmail.com> --- .../query/GridCacheDistributedQueryManager.java | 5 +- ...GridP2PComputeWithNestedEntryProcessorTest.java | 4 +- .../p2p/GridP2PScanQueryWithTransformerTest.java | 406 +++++++++++++++++++++ .../ignite/testsuites/IgniteP2PSelfTestSuite.java | 4 +- .../tests/p2p/cache/ScanQueryTestTransformer.java | 42 +++ .../p2p/cache/ScanQueryTestTransformerWrapper.java | 41 +++ 6 files changed, 498 insertions(+), 4 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index b03909f..3abebf4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -539,6 +539,9 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage Boolean dataPageScanEnabled = qry.query().isDataPageScanEnabled(); MvccSnapshot mvccSnapshot = qry.query().mvccSnapshot(); + boolean deployFilterOrTransformer = (qry.query().scanFilter() != null || qry.query().transform() != null) + && cctx.gridDeploy().enabled(); + final GridCacheQueryRequest req = new GridCacheQueryRequest( cctx.cacheId(), reqId, @@ -562,7 +565,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage queryTopologyVersion(), mvccSnapshot, // Force deployment anyway if scan query is used. - cctx.deploymentEnabled() || (qry.query().scanFilter() != null && cctx.gridDeploy().enabled()), + cctx.deploymentEnabled() || deployFilterOrTransformer, dataPageScanEnabled); addQueryFuture(req.id(), fut); diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PComputeWithNestedEntryProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PComputeWithNestedEntryProcessorTest.java index f3b1731..4959c0d 100644 --- a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PComputeWithNestedEntryProcessorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PComputeWithNestedEntryProcessorTest.java @@ -136,7 +136,7 @@ public class GridP2PComputeWithNestedEntryProcessorTest extends GridCommonAbstra assertTrue(key >= ENTRIES || res); } - scnaCacheData(cache); + scanCacheData(cache); } } } @@ -183,7 +183,7 @@ public class GridP2PComputeWithNestedEntryProcessorTest extends GridCommonAbstra * @param cache Ignite cache. * @throws Exception If failed. */ - private void scnaCacheData(IgniteCache cache) throws Exception { + private void scanCacheData(IgniteCache cache) throws Exception { scanByCopositeFirstPredicate(cache); scanByCopositeSecondPredicate(cache); scanByCopositeFirstSecondPredicate(cache); diff --git a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PScanQueryWithTransformerTest.java b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PScanQueryWithTransformerTest.java new file mode 100644 index 0000000..28a1e6f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PScanQueryWithTransformerTest.java @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.p2p; + +import java.lang.reflect.Constructor; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.List; +import javax.cache.Cache; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; +import org.apache.ignite.testframework.config.GridTestProperties; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +/** + * + */ +public class GridP2PScanQueryWithTransformerTest extends GridCommonAbstractTest { + /** Test class loader. */ + private static final ClassLoader TEST_CLASS_LOADER; + + /** Name of explicit class used as a Transformer for Scan Query. */ + private static final String TRANSFORMER_CLASS_NAME = "org.apache.ignite.tests.p2p.cache.ScanQueryTestTransformer"; + + /** Name of class-wrapper for anonymous class used as a Transformer for Scan Query. */ + private static final String TRANSFORMER_CLO_WRAPPER_CLASS_NAME = + "org.apache.ignite.tests.p2p.cache.ScanQueryTestTransformerWrapper"; + + /** */ + private static final int SCALE_FACTOR = 7; + + /** */ + private static final int CACHE_SIZE = 10; + + /** Initialize ClassLoader. */ + static { + try { + TEST_CLASS_LOADER = new URLClassLoader( + new URL[] {new URL(GridTestProperties.getProperty("p2p.uri.cls"))}, + GridP2PScanQueryWithTransformerTest.class.getClassLoader()); + } + catch (MalformedURLException e) { + throw new RuntimeException("Define property p2p.uri.cls", e); + } + } + + /** */ + private boolean p2pEnabled; + + /** */ + private ClassLoader clsLoader; + + /** */ + private IgniteLogger logger; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setPeerClassLoadingEnabled(p2pEnabled); + cfg.setClassLoader(clsLoader); + if (logger != null) + cfg.setGridLogger(logger); + + return cfg; + } + + /** + * Verifies that Scan Query Transformer is loaded by p2p mechanism <b>from client node</b> + * when it is missing on server's classpath. + * + * Scan Query result set is examined by iteration over Cursor. + * + * @throws Exception If failed. + */ + @Test + public void testScanQueryCursorFromClientNodeWithExplicitClass() throws Exception { + p2pEnabled = true; + + executeP2PClassLoadingEnabledTest(true); + } + + /** + * Verifies that Scan Query Transformer is loaded by p2p mechanism <b>from another server node</b> + * when it is missing on server's classpath. + * + * Scan Query result set is examined by iteration over Cursor. + * + * @throws Exception If failed. + */ + @Test + public void testScanQueryCursorFromServerNodeWithExplicitClass() throws Exception { + p2pEnabled = true; + + executeP2PClassLoadingEnabledTest(false); + } + + /** + * Verifies that Scan Query Transformer is loaded by p2p mechanism <b>from client node</b> + * when it is missing on server's classpath. + * + * Scan Query result set is examined by getAll call from Cursor.. + * + * @throws Exception If failed. + */ + @Test + public void testScanQueryGetAllFromClientNodeWithExplicitClass() throws Exception { + p2pEnabled = true; + + IgniteEx ig0 = startGrid(0); + + IgniteCache<Integer, Integer> cache = ig0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + populateCache(cache); + + IgniteEx client = startClientGrid(1); + + IgniteCache<Object, Object> clientCache = client.getOrCreateCache(DEFAULT_CACHE_NAME); + + QueryCursor<Integer> query = clientCache.query(new ScanQuery<Integer, Integer>(), loadTransformerClass()); + + List<Integer> results = query.getAll(); + + assertNotNull(results); + assertEquals(CACHE_SIZE, results.size()); + } + + /** + * Verifies that Scan Query Transformer is loaded by p2p mechanism <b>from client node</b> + * when it is missing on server's classpath. + * + * Transformer class is implemented as an anonymous instance of {@link IgniteClosure} interface. + * + * @throws Exception If failed. + */ + @Test + public void testScanQueryCursorFromClientNodeWithAnonymousClass() throws Exception { + p2pEnabled = true; + + IgniteEx ig0 = startGrid(0); + + IgniteCache<Integer, Integer> cache = ig0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + int sumPopulated = populateCache(cache); + + IgniteEx client = startClientGrid(1); + + IgniteCache<Object, Object> clientCache = client.getOrCreateCache(DEFAULT_CACHE_NAME); + + QueryCursor<Integer> query = clientCache.query(new ScanQuery<Integer, Integer>(), loadTransformerClosure()); + + int sumQueried = 0; + + for (Integer val : query) + sumQueried += val; + + assertTrue(sumQueried == sumPopulated * SCALE_FACTOR); + } + + /** + * Verifies that execution <b>from client node</b> of Scan Query with Transformer + * fails if Transformer class is missing on server node and p2p class loading is disabled. + * + * @throws Exception If failed. + */ + @Test + public void testScanQueryFromClientFailsIfP2PClassLoadingIsDisabled() throws Exception { + p2pEnabled = false; + + executeP2PClassLoadingDisabledTest(true); + } + + /** + * Verifies that execution <b>from server node</b> of Scan Query with Transformer + * fails if Transformer class is missing on server node and p2p class loading is disabled. + * + * @throws Exception If failed. + */ + @Test + public void testScanQueryFromServerFailsIfP2PClassLoadingIsDisabled() throws Exception { + p2pEnabled = false; + + executeP2PClassLoadingDisabledTest(false); + } + + /** + * Verifies that deployment isn't needed if Transformer class is available on classpath of all nodes. + * + * @throws Exception If failed. + */ + @Test + public void testSharedTransformerWorksWhenP2PIsDisabled() throws Exception { + p2pEnabled = false; + + IgniteEx ig0 = startGrid(0); + + IgniteCache<Integer, Integer> cache = ig0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + populateCache(cache); + + IgniteEx client = startClientGrid(1); + + IgniteCache<Object, Object> clientCache = client.getOrCreateCache(DEFAULT_CACHE_NAME); + + QueryCursor<Integer> query = clientCache.query(new ScanQuery<Integer, Integer>(), + new SharedTransformer(SCALE_FACTOR)); + + List<Integer> results = query.getAll(); + + assertNotNull(results); + assertEquals(CACHE_SIZE, results.size()); + } + + /** + * Executes scenario with successful p2p loading of Transformer class + * with client or server node sending Scan Query request and iterating over result set. + * + * @param withClientNode Flag to execute scan query from client or server node. + * @throws Exception If failed. + */ + private void executeP2PClassLoadingEnabledTest(boolean withClientNode) throws Exception { + ListeningTestLogger listeningLogger = new ListeningTestLogger(); + LogListener clsDeployedMsgLsnr = LogListener.matches( + "Class was deployed in SHARED or CONTINUOUS mode: " + + "class org.apache.ignite.tests.p2p.cache.ScanQueryTestTransformer") + .build(); + listeningLogger.registerListener(clsDeployedMsgLsnr); + logger = listeningLogger; + + IgniteEx ig0 = startGrid(0); + + logger = null; + + IgniteCache<Integer, Integer> cache = ig0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + int sumPopulated = populateCache(cache); + + IgniteEx requestingNode; + + if (withClientNode) + requestingNode = startClientGrid(1); + else { + clsLoader = TEST_CLASS_LOADER; + requestingNode = startGrid(1); + } + + IgniteCache<Object, Object> reqNodeCache = requestingNode.getOrCreateCache(DEFAULT_CACHE_NAME); + + QueryCursor<Integer> query = reqNodeCache.query(new ScanQuery<Integer, Integer>(), loadTransformerClass()); + + int sumQueried = 0; + + for (Integer val : query) + sumQueried += val; + + assertTrue(sumQueried == sumPopulated * SCALE_FACTOR); + assertTrue(clsDeployedMsgLsnr.check()); + } + + /** + * Executes scenario with p2p loading of Transformer class failed + * with client or server node sending Scan Query request. + * + * @param withClientNode Flag to execute scan query from client or server node. + * @throws Exception If test scenario failed. + */ + private void executeP2PClassLoadingDisabledTest(boolean withClientNode) throws Exception { + ListeningTestLogger listeningLogger = new ListeningTestLogger(); + LogListener clsDeployedMsgLsnr = LogListener.matches( + "Class was deployed in SHARED or CONTINUOUS mode: " + + "class org.apache.ignite.tests.p2p.cache.ScanQueryTestTransformerWrapper") + .build(); + listeningLogger.registerListener(clsDeployedMsgLsnr); + logger = listeningLogger; + + IgniteEx ig0 = startGrid(0); + + logger = null; + + IgniteCache<Integer, Integer> cache = ig0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); + + populateCache(cache); + + IgniteEx requestNode; + + if (withClientNode) + requestNode = startClientGrid(1); + else { + clsLoader = TEST_CLASS_LOADER; + requestNode = startGrid(1); + } + + IgniteCache<Object, Object> reqNodeCache = requestNode.getOrCreateCache(DEFAULT_CACHE_NAME); + + QueryCursor<Integer> query = reqNodeCache.query(new ScanQuery<Integer, Integer>(), loadTransformerClosure()); + + try { + List<Integer> all = query.getAll(); + } + catch (Exception e) { + //No-op. + + checkTopology(2); + + assertFalse(clsDeployedMsgLsnr.check()); + + return; + } + + fail("Expected exception on executing scan query hasn't been not thrown."); + } + + /** + * @param cache Cache to populate. + */ + private int populateCache(IgniteCache cache) { + int sum = 0; + + for (int i = 0; i < CACHE_SIZE; i++) { + sum += i; + + cache.put(i, i); + } + + return sum; + } + + /** + * Loads class for query transformer from another package so server doesn't have access to it. + * + * @return Instance of transformer class. + * @throws Exception If load has failed. + */ + private IgniteClosure loadTransformerClass() throws Exception { + Constructor ctor = TEST_CLASS_LOADER.loadClass(TRANSFORMER_CLASS_NAME).getConstructor(int.class); + + return (IgniteClosure)ctor.newInstance(SCALE_FACTOR); + } + + /** + * Loads anonymous class for query transformer from another package + * so executing server node doesn't have access to it. + * + * @return Instance of anonymous class implementing {@link IgniteClosure} to be used as a transformer. + * @throws Exception If load has failed. + */ + private IgniteClosure loadTransformerClosure() throws Exception { + Constructor<?> ctor = TEST_CLASS_LOADER.loadClass(TRANSFORMER_CLO_WRAPPER_CLASS_NAME).getConstructor(int.class); + + Object wrapper = ctor.newInstance(SCALE_FACTOR); + + return GridTestUtils.getFieldValue(wrapper, "clo"); + } + + /** */ + private static final class SharedTransformer implements IgniteClosure<Cache.Entry<Integer, Integer>, Integer> { + /** */ + private final int scaleFactor; + + /** */ + private SharedTransformer(int factor) { + scaleFactor = factor; + } + + /** {@inheritDoc} */ + @Override public Integer apply(Cache.Entry<Integer, Integer> entry) { + return entry.getValue() * scaleFactor; + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java index 9fdc3fb..a054104 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java @@ -33,6 +33,7 @@ import org.apache.ignite.p2p.GridP2PNodeLeftSelfTest; import org.apache.ignite.p2p.GridP2PRecursionTaskSelfTest; import org.apache.ignite.p2p.GridP2PRemoteClassLoadersSelfTest; import org.apache.ignite.p2p.GridP2PSameClassLoaderSelfTest; +import org.apache.ignite.p2p.GridP2PScanQueryWithTransformerTest; import org.apache.ignite.p2p.GridP2PTimeoutSelfTest; import org.apache.ignite.p2p.GridP2PUndeploySelfTest; import org.apache.ignite.p2p.P2PScanQueryUndeployTest; @@ -67,7 +68,8 @@ import org.junit.runners.Suite; P2PScanQueryUndeployTest.class, GridDeploymentMessageCountSelfTest.class, GridP2PComputeWithNestedEntryProcessorTest.class, - GridP2PCountTiesLoadClassDirectlyFromClassLoaderTest.class + GridP2PCountTiesLoadClassDirectlyFromClassLoaderTest.class, + GridP2PScanQueryWithTransformerTest.class }) public class IgniteP2PSelfTestSuite { } diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/ScanQueryTestTransformer.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/ScanQueryTestTransformer.java new file mode 100644 index 0000000..8647f6c --- /dev/null +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/ScanQueryTestTransformer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.p2p.cache; + +import javax.cache.Cache; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.Query; +import org.apache.ignite.lang.IgniteClosure; + +/** + * Test class to verify p2p class loading for transformer of Scan Query + * (see {@link IgniteCache#query(Query, IgniteClosure)}). + */ +public class ScanQueryTestTransformer implements IgniteClosure<Cache.Entry<Integer, Integer>, Integer> { + /** */ + private final int scaleFactor; + + /** */ + public ScanQueryTestTransformer(int scaleFactor) { + this.scaleFactor = scaleFactor; + } + + /** {@inheritDoc} */ + @Override public Integer apply(Cache.Entry<Integer, Integer> entry) { + return entry.getValue() * scaleFactor; + } +} diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/ScanQueryTestTransformerWrapper.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/ScanQueryTestTransformerWrapper.java new file mode 100644 index 0000000..18152b5 --- /dev/null +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/cache/ScanQueryTestTransformerWrapper.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.p2p.cache; + +import javax.cache.Cache; +import org.apache.ignite.lang.IgniteClosure; + +/** */ +public class ScanQueryTestTransformerWrapper { + /** */ + private final int scaleFactor; + + /** */ + private final IgniteClosure<Cache.Entry<Integer, Integer>, Integer> clo = new IgniteClosure<Cache.Entry<Integer, Integer>, Integer>() { + @Override public Integer apply(Cache.Entry<Integer, Integer> entry) { + return entry.getValue() * scaleFactor; + } + }; + + /** + * @param scaleFactor Scale factor. + */ + public ScanQueryTestTransformerWrapper(int scaleFactor) { + this.scaleFactor = scaleFactor; + } +}