ignite-1168 Added support for metadata, scan commands in rest.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/12235254 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/12235254 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/12235254 Branch: refs/heads/ignite-843 Commit: 1223525478a80f527abcce7327a4d8b92c8b085a Parents: a1e5cc5 Author: Andrey <anovi...@gridgain.com> Authored: Mon Oct 12 15:51:48 2015 +0700 Committer: Andrey <anovi...@gridgain.com> Committed: Mon Oct 12 15:51:48 2015 +0700 ---------------------------------------------------------------------- .../JettyRestProcessorAbstractSelfTest.java | 225 ++++++++ .../discovery/GridDiscoveryManager.java | 515 ++++++++++++++++++- .../processors/cache/GridCacheProcessor.java | 48 +- .../cache/query/GridCacheQueryManager.java | 74 ++- .../cache/query/GridCacheSqlIndexMetadata.java | 7 +- .../cache/query/GridCacheSqlMetadata.java | 22 +- .../processors/rest/GridRestCommand.java | 8 +- .../processors/rest/GridRestProcessor.java | 63 ++- .../handlers/cache/GridCacheCommandHandler.java | 362 +++++++------ .../handlers/query/QueryCommandHandler.java | 195 +++++-- .../top/GridTopologyCommandHandler.java | 160 +++++- .../rest/request/RestQueryRequest.java | 175 +++++++ .../rest/request/RestSqlQueryRequest.java | 125 ----- .../http/jetty/GridJettyJsonConfig.java | 158 +++++- .../http/jetty/GridJettyRestHandler.java | 186 ++++--- 15 files changed, 1849 insertions(+), 474 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java index ac0edff..bb6e67e 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java @@ -24,6 +24,7 @@ import java.io.Serializable; import java.net.URL; import java.net.URLConnection; import java.net.URLEncoder; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -31,14 +32,24 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; +import net.sf.json.JSONNull; import net.sf.json.JSONObject; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.processors.cache.GridCacheProcessor; +import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.query.GridCacheSqlIndexMetadata; +import org.apache.ignite.internal.processors.cache.query.GridCacheSqlMetadata; import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandler; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.testframework.GridTestUtils; import static org.apache.ignite.IgniteSystemProperties.IGNITE_JETTY_PORT; @@ -907,6 +918,106 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro } /** + * @param meta Metadata for Ignite cache. + * @throws Exception If failed. + */ + private void testMetadata(GridCacheSqlMetadata meta) throws Exception { + Map<String, String> params = F.asMap("cmd", GridRestCommand.CACHE_METADATA.key()); + + if (meta.cacheName() != null) + params.put("cacheName", meta.cacheName()); + + String ret = content(params); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + info("Cache metadata result: " + ret); + + jsonEquals(ret, pattern("\\{.+\\}", true)); + + Map res = (Map)JSONObject.fromObject(ret).get("response"); + + Collection types = (Collection)res.get("types"); + + assertNotNull(types); + assertEqualsCollections(meta.types(), types); + + Map keyClasses = (Map)res.get("keyClasses"); + + assertNotNull(keyClasses); + assertTrue(meta.keyClasses().equals(keyClasses)); + + Map valClasses = (Map)res.get("valClasses"); + + assertNotNull(valClasses); + assertTrue(meta.valClasses().equals(valClasses)); + + Map fields = (Map)res.get("fields"); + + assertNotNull(fields); + assertTrue(meta.fields().equals(fields)); + + Map indexesByType = (Map)res.get("indexes"); + + assertNotNull(indexesByType); + assertEquals(meta.indexes().size(), indexesByType.size()); + + for (Map.Entry<String, Collection<GridCacheSqlIndexMetadata>> metaIndexes : meta.indexes().entrySet()) { + Collection<Map> indexes = (Collection<Map>)indexesByType.get(metaIndexes.getKey()); + + assertNotNull(indexes); + assertEquals(metaIndexes.getValue().size(), indexes.size()); + + for (final GridCacheSqlIndexMetadata metaIdx : metaIndexes.getValue()) { + Map idx = F.find(indexes, null, new IgnitePredicate<Map>() { + @Override public boolean apply(Map map) { + return metaIdx.name().equals(map.get("name")); + } + }); + + assertNotNull(idx); + + assertEqualsCollections(metaIdx.fields(), (Collection)idx.get("fields")); + assertEqualsCollections(metaIdx.descendings(), (Collection)idx.get("descendings")); + assertEquals(metaIdx.unique(), idx.get("unique")); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testMetadataLocal() throws Exception { + GridCacheProcessor cacheProc = grid(0).context().cache(); + + for (IgniteInternalCache<?, ?> cache : cacheProc.caches()) { + if (CU.isSystemCache(cache.name())) + continue; + + GridCacheSqlMetadata meta = F.first(cache.context().queries().sqlMetadata()); + + testMetadata(meta); + } + } + + /** + * @throws Exception If failed. + */ + public void testMetadataRemote() throws Exception { + CacheConfiguration<Integer, String> partialCacheCfg = new CacheConfiguration<>("partial"); + + partialCacheCfg.setIndexedTypes(Integer.class, String.class); + partialCacheCfg.setNodeFilter(new NodeIdFilter(grid(1).localNode().id())); + + IgniteCacheProxy<Integer, String> c = (IgniteCacheProxy<Integer, String>)grid(1).createCache(partialCacheCfg); + + GridCacheSqlMetadata meta = F.first(c.context().queries().sqlMetadata()); + + testMetadata(meta); + } + + /** * @throws Exception If failed. */ public void testTopology() throws Exception { @@ -918,6 +1029,23 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro info("Topology command result: " + ret); jsonEquals(ret, pattern("\\[\\{.+\\}\\]", true)); + + JSONObject json = JSONObject.fromObject(ret); + + Collection<Map> nodes = (Collection)json.get("response"); + + assertEquals(GRID_CNT, nodes.size()); + + for (Map node : nodes) { + assertEquals(JSONNull.getInstance(), node.get("attributes")); + assertEquals(JSONNull.getInstance(), node.get("metrics")); + + assertEquals("PARTITIONED", node.get("defaultCacheMode")); + + Map caches = (Map)node.get("caches"); + + assertEquals(F.asMap("person", "PARTITIONED"), caches); + } } /** @@ -1056,6 +1184,75 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro /** * @throws Exception If failed. */ + public void testQueryScan() throws Exception { + Map<String, String> params = new HashMap<>(); + params.put("cmd", GridRestCommand.EXECUTE_SCAN_QUERY.key()); + params.put("pageSize", "10"); + params.put("cacheName", "person"); + + String ret = content(params); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + JSONObject json = JSONObject.fromObject(ret); + + List items = (List)((Map)json.get("response")).get("items"); + + assertEquals(4, items.size()); + + assertFalse(queryCursorFound()); + } + + /** + * @throws Exception If failed. + */ + public void testFilterQueryScan() throws Exception { + Map<String, String> params = new HashMap<>(); + params.put("cmd", GridRestCommand.EXECUTE_SCAN_QUERY.key()); + params.put("pageSize", "10"); + params.put("cacheName", "person"); + params.put("classname", ScanFilter.class.getName()); + + String ret = content(params); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + JSONObject json = JSONObject.fromObject(ret); + + List items = (List)((Map)json.get("response")).get("items"); + + assertEquals(2, items.size()); + + assertFalse(queryCursorFound()); + } + + /** + * @throws Exception If failed. + */ + public void testIncorrectFilterQueryScan() throws Exception { + Map<String, String> params = new HashMap<>(); + params.put("cmd", GridRestCommand.EXECUTE_SCAN_QUERY.key()); + params.put("pageSize", "10"); + params.put("cacheName", "person"); + params.put("classname", ScanFilter.class.getName() + 1); + + String ret = content(params); + + assertNotNull(ret); + assertTrue(!ret.isEmpty()); + + JSONObject json = JSONObject.fromObject(ret); + + String err = (String)json.get("error"); + + assertTrue(err.contains("Failed to find target class")); + } + + /** + * @throws Exception If failed. + */ public void testQuery() throws Exception { grid(0).cache(null).put("1", "1"); grid(0).cache(null).put("2", "2"); @@ -1323,4 +1520,32 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro return id; } } + + /** + * Test filter for scan query. + */ + public static class ScanFilter implements IgniteBiPredicate<Integer, Person> { + /** {@inheritDoc} */ + @Override public boolean apply(Integer integer, Person person) { + return person.salary > 1000; + } + } + + /** Filter by node ID. */ + private static class NodeIdFilter implements IgnitePredicate<ClusterNode> { + /** */ + private final UUID nid; + + /** + * @param nid Node ID where cache should be started. + */ + NodeIdFilter(UUID nid) { + this.nid = nid; + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode n) { + return n.id().equals(nid); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 2ed4520..9e54f6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -56,6 +56,7 @@ import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.cache.CacheMetrics; +import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; @@ -299,16 +300,16 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param cacheName Cache name. * @param filter Cache filter. * @param nearEnabled Near enabled flag. - * @param loc {@code True} if cache is local. + * @param cacheMode Cache mode. */ public void setCacheFilter( String cacheName, IgnitePredicate<ClusterNode> filter, boolean nearEnabled, - boolean loc + CacheMode cacheMode ) { if (!registeredCaches.containsKey(cacheName)) - registeredCaches.put(cacheName, new CachePredicate(filter, nearEnabled, loc)); + registeredCaches.put(cacheName, new CachePredicate(filter, nearEnabled, cacheMode)); } /** @@ -1592,6 +1593,25 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * @param node Node to check. + * @return Cache names accessible on the given node. + */ + public Map<String, CacheMode> nodeCaches(ClusterNode node) { + Map<String, CacheMode> caches = U.newHashMap(registeredCaches.size()); + + for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) { + String cacheName = entry.getKey(); + + CachePredicate pred = entry.getValue(); + + if (pred != null && pred.cacheNode(node)) + caches.put(cacheName, pred.cacheMode); + } + + return caches; + } + + /** * Checks if cache with given name has at least one node with near cache enabled. * * @param cacheName Cache name. @@ -2822,7 +2842,484 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } - /** + /** Cache for discovery collections. */ + private class DiscoCache { + /** Remote nodes. */ + private final List<ClusterNode> rmtNodes; + + /** All nodes. */ + private final List<ClusterNode> allNodes; + + /** All nodes with at least one cache configured. */ + @GridToStringInclude + private final Collection<ClusterNode> allNodesWithCaches; + + /** All nodes with at least one cache configured. */ + @GridToStringInclude + private final Collection<ClusterNode> rmtNodesWithCaches; + + /** Cache nodes by cache name. */ + @GridToStringInclude + private final Map<String, Collection<ClusterNode>> allCacheNodes; + + /** Remote cache nodes by cache name. */ + @GridToStringInclude + private final Map<String, Collection<ClusterNode>> rmtCacheNodes; + + /** Cache nodes by cache name. */ + @GridToStringInclude + private final Map<String, Collection<ClusterNode>> affCacheNodes; + + /** Caches where at least one node has near cache enabled. */ + @GridToStringInclude + private final Set<String> nearEnabledCaches; + + /** Nodes grouped by version. */ + private final NavigableMap<IgniteProductVersion, Collection<ClusterNode>> nodesByVer; + + /** Daemon nodes. */ + private final List<ClusterNode> daemonNodes; + + /** Node map. */ + private final Map<UUID, ClusterNode> nodeMap; + + /** Local node. */ + private final ClusterNode loc; + + /** Highest node order. */ + private final long maxOrder; + + /** + * Cached alive nodes list. As long as this collection doesn't accept {@code null}s use {@link + * #maskNull(String)} before passing raw cache names to it. + */ + private final ConcurrentMap<String, Collection<ClusterNode>> aliveCacheNodes; + + /** + * Cached alive remote nodes list. As long as this collection doesn't accept {@code null}s use {@link + * #maskNull(String)} before passing raw cache names to it. + */ + private final ConcurrentMap<String, Collection<ClusterNode>> aliveRmtCacheNodes; + + /** + * Cached alive remote nodes with caches. + */ + private final Collection<ClusterNode> aliveNodesWithCaches; + + /** + * Cached alive server remote nodes with caches. + */ + private final Collection<ClusterNode> aliveSrvNodesWithCaches; + + /** + * Cached alive remote server nodes with caches. + */ + private final Collection<ClusterNode> aliveRmtSrvNodesWithCaches; + + /** + * @param loc Local node. + * @param rmts Remote nodes. + */ + private DiscoCache(ClusterNode loc, Collection<ClusterNode> rmts) { + this.loc = loc; + + rmtNodes = Collections.unmodifiableList(new ArrayList<>(F.view(rmts, daemonFilter))); + + assert !rmtNodes.contains(loc) : "Remote nodes collection shouldn't contain local node" + + " [rmtNodes=" + rmtNodes + ", loc=" + loc + ']'; + + List<ClusterNode> all = new ArrayList<>(rmtNodes.size() + 1); + + if (!loc.isDaemon()) + all.add(loc); + + all.addAll(rmtNodes); + + Collections.sort(all, GridNodeOrderComparator.INSTANCE); + + allNodes = Collections.unmodifiableList(all); + + Map<String, Collection<ClusterNode>> cacheMap = new HashMap<>(allNodes.size(), 1.0f); + Map<String, Collection<ClusterNode>> rmtCacheMap = new HashMap<>(allNodes.size(), 1.0f); + Map<String, Collection<ClusterNode>> dhtNodesMap = new HashMap<>(allNodes.size(), 1.0f); + Collection<ClusterNode> nodesWithCaches = new HashSet<>(allNodes.size()); + Collection<ClusterNode> rmtNodesWithCaches = new HashSet<>(allNodes.size()); + + aliveCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); + aliveRmtCacheNodes = new ConcurrentHashMap8<>(allNodes.size(), 1.0f); + aliveNodesWithCaches = new ConcurrentSkipListSet<>(); + aliveSrvNodesWithCaches = new ConcurrentSkipListSet<>(); + aliveRmtSrvNodesWithCaches = new ConcurrentSkipListSet<>(); + nodesByVer = new TreeMap<>(); + + long maxOrder0 = 0; + + Set<String> nearEnabledSet = new HashSet<>(); + + for (ClusterNode node : allNodes) { + assert node.order() != 0 : "Invalid node order [locNode=" + loc + ", node=" + node + ']'; + + if (node.order() > maxOrder0) + maxOrder0 = node.order(); + + boolean hasCaches = false; + + for (Map.Entry<String, CachePredicate> entry : registeredCaches.entrySet()) { + String cacheName = entry.getKey(); + + CachePredicate filter = entry.getValue(); + + if (filter.cacheNode(node)) { + nodesWithCaches.add(node); + + if (!loc.id().equals(node.id())) + rmtNodesWithCaches.add(node); + + addToMap(cacheMap, cacheName, node); + + if (alive(node.id())) + addToMap(aliveCacheNodes, maskNull(cacheName), node); + + if (filter.dataNode(node)) + addToMap(dhtNodesMap, cacheName, node); + + if (filter.nearNode(node)) + nearEnabledSet.add(cacheName); + + if (!loc.id().equals(node.id())) { + addToMap(rmtCacheMap, cacheName, node); + + if (alive(node.id())) + addToMap(aliveRmtCacheNodes, maskNull(cacheName), node); + } + + hasCaches = true; + } + } + + if (hasCaches) { + if (alive(node.id())) { + aliveNodesWithCaches.add(node); + + if (!CU.clientNode(node)) { + aliveSrvNodesWithCaches.add(node); + + if (!loc.id().equals(node.id())) + aliveRmtSrvNodesWithCaches.add(node); + } + } + } + + IgniteProductVersion nodeVer = U.productVersion(node); + + // Create collection for this version if it does not exist. + Collection<ClusterNode> nodes = nodesByVer.get(nodeVer); + + if (nodes == null) { + nodes = new ArrayList<>(allNodes.size()); + + nodesByVer.put(nodeVer, nodes); + } + + nodes.add(node); + } + + // Need second iteration to add this node to all previous node versions. + for (ClusterNode node : allNodes) { + IgniteProductVersion nodeVer = U.productVersion(node); + + // Get all versions lower or equal node's version. + NavigableMap<IgniteProductVersion, Collection<ClusterNode>> updateView = + nodesByVer.headMap(nodeVer, false); + + for (Collection<ClusterNode> prevVersions : updateView.values()) + prevVersions.add(node); + } + + maxOrder = maxOrder0; + + allCacheNodes = Collections.unmodifiableMap(cacheMap); + rmtCacheNodes = Collections.unmodifiableMap(rmtCacheMap); + affCacheNodes = Collections.unmodifiableMap(dhtNodesMap); + allNodesWithCaches = Collections.unmodifiableCollection(nodesWithCaches); + this.rmtNodesWithCaches = Collections.unmodifiableCollection(rmtNodesWithCaches); + nearEnabledCaches = Collections.unmodifiableSet(nearEnabledSet); + + daemonNodes = Collections.unmodifiableList(new ArrayList<>( + F.view(F.concat(false, loc, rmts), F0.not(daemonFilter)))); + + Map<UUID, ClusterNode> nodeMap = new HashMap<>(allNodes().size() + daemonNodes.size(), 1.0f); + + for (ClusterNode n : F.concat(false, allNodes(), daemonNodes())) + nodeMap.put(n.id(), n); + + this.nodeMap = nodeMap; + } + + /** + * Adds node to map. + * + * @param cacheMap Map to add to. + * @param cacheName Cache name. + * @param rich Node to add + */ + private void addToMap(Map<String, Collection<ClusterNode>> cacheMap, String cacheName, ClusterNode rich) { + Collection<ClusterNode> cacheNodes = cacheMap.get(cacheName); + + if (cacheNodes == null) { + cacheNodes = new ArrayList<>(allNodes.size()); + + cacheMap.put(cacheName, cacheNodes); + } + + cacheNodes.add(rich); + } + + /** @return Local node. */ + ClusterNode localNode() { + return loc; + } + + /** @return Remote nodes. */ + Collection<ClusterNode> remoteNodes() { + return rmtNodes; + } + + /** @return All nodes. */ + Collection<ClusterNode> allNodes() { + return allNodes; + } + + /** + * Gets collection of nodes which have version equal or greater than {@code ver}. + * + * @param ver Version to check. + * @return Collection of nodes with version equal or greater than {@code ver}. + */ + Collection<ClusterNode> elderNodes(IgniteProductVersion ver) { + Map.Entry<IgniteProductVersion, Collection<ClusterNode>> entry = nodesByVer.ceilingEntry(ver); + + if (entry == null) + return Collections.emptyList(); + + return entry.getValue(); + } + + /** + * @return Versions map. + */ + NavigableMap<IgniteProductVersion, Collection<ClusterNode>> versionsMap() { + return nodesByVer; + } + + /** + * Gets collection of nodes with at least one cache configured. + * + * @param topVer Topology version (maximum allowed node order). + * @return Collection of nodes. + */ + Collection<ClusterNode> allNodesWithCaches(final long topVer) { + return filter(topVer, allNodesWithCaches); + } + + /** + * Gets all nodes that have cache with given name. + * + * @param cacheName Cache name. + * @param topVer Topology version. + * @return Collection of nodes. + */ + Collection<ClusterNode> cacheNodes(@Nullable String cacheName, final long topVer) { + return filter(topVer, allCacheNodes.get(cacheName)); + } + + /** + * Gets all remote nodes that have cache with given name. + * + * @param cacheName Cache name. + * @param topVer Topology version. + * @return Collection of nodes. + */ + Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, final long topVer) { + return filter(topVer, rmtCacheNodes.get(cacheName)); + } + + /** + * Gets all remote nodes that have at least one cache configured. + * + * @param topVer Topology version. + * @return Collection of nodes. + */ + Collection<ClusterNode> remoteCacheNodes(final long topVer) { + return filter(topVer, rmtNodesWithCaches); + } + + /** + * Gets all nodes that have cache with given name and should participate in affinity calculation. With + * partitioned cache nodes with near-only cache do not participate in affinity node calculation. + * + * @param cacheName Cache name. + * @param topVer Topology version. + * @return Collection of nodes. + */ + Collection<ClusterNode> cacheAffinityNodes(@Nullable String cacheName, final long topVer) { + return filter(topVer, affCacheNodes.get(cacheName)); + } + + /** + * Gets all alive nodes that have cache with given name. + * + * @param cacheName Cache name. + * @param topVer Topology version. + * @return Collection of nodes. + */ + Collection<ClusterNode> aliveCacheNodes(@Nullable String cacheName, final long topVer) { + return filter(topVer, aliveCacheNodes.get(maskNull(cacheName))); + } + + /** + * Gets all alive remote nodes that have cache with given name. + * + * @param cacheName Cache name. + * @param topVer Topology version. + * @return Collection of nodes. + */ + Collection<ClusterNode> aliveRemoteCacheNodes(@Nullable String cacheName, final long topVer) { + return filter(topVer, aliveRmtCacheNodes.get(maskNull(cacheName))); + } + + /** + * Gets all alive remote server nodes with at least one cache configured. + * + * @param topVer Topology version. + * @return Collection of nodes. + */ + Collection<ClusterNode> aliveRemoteServerNodesWithCaches(final long topVer) { + return filter(topVer, aliveRmtSrvNodesWithCaches); + } + + /** + * Gets all alive server nodes with at least one cache configured. + * + * @param topVer Topology version. + * @return Collection of nodes. + */ + Collection<ClusterNode> aliveServerNodesWithCaches(final long topVer) { + return filter(topVer, aliveSrvNodesWithCaches); + } + + /** + * Gets all alive remote nodes with at least one cache configured. + * + * @param topVer Topology version. + * @return Collection of nodes. + */ + Collection<ClusterNode> aliveNodesWithCaches(final long topVer) { + return filter(topVer, aliveNodesWithCaches); + } + + /** + * Checks if cache with given name has at least one node with near cache enabled. + * + * @param cacheName Cache name. + * @return {@code True} if cache with given name has at least one node with near cache enabled. + */ + boolean hasNearCache(@Nullable String cacheName) { + return nearEnabledCaches.contains(cacheName); + } + + /** + * Removes left node from cached alives lists. + * + * @param leftNode Left node. + */ + void updateAlives(ClusterNode leftNode) { + if (leftNode.order() > maxOrder) + return; + + filterNodeMap(aliveCacheNodes, leftNode); + + filterNodeMap(aliveRmtCacheNodes, leftNode); + + aliveNodesWithCaches.remove(leftNode); + aliveSrvNodesWithCaches.remove(leftNode); + aliveRmtSrvNodesWithCaches.remove(leftNode); + } + + /** + * Creates a copy of nodes map without the given node. + * + * @param map Map to copy. + * @param exclNode Node to exclude. + */ + private void filterNodeMap(ConcurrentMap<String, Collection<ClusterNode>> map, final ClusterNode exclNode) { + for (String cacheName : registeredCaches.keySet()) { + String maskedName = maskNull(cacheName); + + while (true) { + Collection<ClusterNode> oldNodes = map.get(maskedName); + + if (oldNodes == null || oldNodes.isEmpty()) + break; + + Collection<ClusterNode> newNodes = new ArrayList<>(oldNodes); + + if (!newNodes.remove(exclNode)) + break; + + if (map.replace(maskedName, oldNodes, newNodes)) + break; + } + } + } + + /** + * Replaces {@code null} with {@code NULL_CACHE_NAME}. + * + * @param cacheName Cache name. + * @return Masked name. + */ + private String maskNull(@Nullable String cacheName) { + return cacheName == null ? NULL_CACHE_NAME : cacheName; + } + + /** + * @param topVer Topology version. + * @param nodes Nodes. + * @return Filtered collection (potentially empty, but never {@code null}). + */ + private Collection<ClusterNode> filter(final long topVer, @Nullable Collection<ClusterNode> nodes) { + if (nodes == null) + return Collections.emptyList(); + + // If no filtering needed, return original collection. + return nodes.isEmpty() || topVer < 0 || topVer >= maxOrder ? + nodes : + F.view(nodes, new P1<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return node.order() <= topVer; + } + }); + } + + /** @return Daemon nodes. */ + Collection<ClusterNode> daemonNodes() { + return daemonNodes; + } + + /** + * @param id Node ID. + * @return Node. + */ + @Nullable ClusterNode node(UUID id) { + return nodeMap.get(id); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DiscoCache.class, this, "allNodesWithDaemons", U.toShortString(allNodes)); + } + } /** * Cache predicate. */ private static class CachePredicate { @@ -2832,8 +3329,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** If near cache is enabled on data nodes. */ private final boolean nearEnabled; - /** Flag indicating if cache is local. */ - private final boolean loc; + /** Cache mode. */ + private final CacheMode cacheMode; /** Collection of client near nodes. */ private final ConcurrentHashMap<UUID, Boolean> clientNodes; @@ -2841,14 +3338,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * @param cacheFilter Cache filter. * @param nearEnabled Near enabled flag. - * @param loc {@code True} if cache is local. + * @param cacheMode Cache mode. */ - private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, boolean loc) { + private CachePredicate(IgnitePredicate<ClusterNode> cacheFilter, boolean nearEnabled, CacheMode cacheMode) { assert cacheFilter != null; this.cacheFilter = cacheFilter; this.nearEnabled = nearEnabled; - this.loc = loc; + this.cacheMode = cacheMode; clientNodes = new ConcurrentHashMap<>(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index daa4475..736e630 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -689,7 +689,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { cfg.getName(), cfg.getNodeFilter(), cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED, - cfg.getCacheMode() == LOCAL); + cfg.getCacheMode()); ctx.discovery().addClientNode(cfg.getName(), ctx.localNodeId(), @@ -1941,7 +1941,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.cacheName(), ccfg.getNodeFilter(), ccfg.getNearConfiguration() != null, - ccfg.getCacheMode() == LOCAL); + ccfg.getCacheMode()); } } else { @@ -1968,7 +1968,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { req.cacheName(), ccfg.getNodeFilter(), ccfg.getNearConfiguration() != null, - ccfg.getCacheMode() == LOCAL); + ccfg.getCacheMode()); } } } @@ -2468,7 +2468,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ccfg.getName(), ccfg.getNodeFilter(), ccfg.getNearConfiguration() != null, - ccfg.getCacheMode() == LOCAL); + ccfg.getCacheMode()); ctx.discovery().addClientNode(req.cacheName(), req.initiatingNodeId(), @@ -3482,6 +3482,46 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * */ + @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") + private class TemplateConfigurationFuture extends GridFutureAdapter<Object> { + /** Start ID. */ + @GridToStringInclude + private IgniteUuid deploymentId; + + /** Cache name. */ + private String cacheName; + + /** + * @param cacheName Cache name. + * @param deploymentId Deployment ID. + */ + private TemplateConfigurationFuture(String cacheName, IgniteUuid deploymentId) { + this.deploymentId = deploymentId; + this.cacheName = cacheName; + } + + /** + * @return Start ID. + */ + public IgniteUuid deploymentId() { + return deploymentId; + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { + // Make sure to remove future before completion. + pendingTemplateFuts.remove(maskNull(cacheName), this); + + return super.onDone(res, err); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TemplateConfigurationFuture.class, this); + } + } /** + * + */ private static class LocalAffinityFunction implements AffinityFunction { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 25ace1b..698b035 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -2229,6 +2229,26 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ + @Override public Map<String, String> keyClasses() { + return keyClasses; + } + + /** {@inheritDoc} */ + @Override public Map<String, String> valClasses() { + return valClasses; + } + + /** {@inheritDoc} */ + @Override public Map<String, Map<String, String>> fields() { + return fields; + } + + /** {@inheritDoc} */ + @Override public Map<String, Collection<GridCacheSqlIndexMetadata>> indexes() { + return indexes; + } + + /** {@inheritDoc} */ @Override public Collection<GridCacheSqlIndexMetadata> indexes(String type) { return indexes.get(type); } @@ -2319,6 +2339,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** {@inheritDoc} */ + @Override public Collection<String> descendings() { + return descendings; + } + + /** {@inheritDoc} */ @Override public boolean unique() { return unique; } @@ -2687,15 +2712,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * Cached result. */ private abstract static class CachedResult<R> extends GridFutureAdapter<IgniteSpiCloseableIterator<R>> { + /** Absolute position of each recipient. */ + private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1); /** */ private CircularQueue<R> queue; - /** */ private int pruned; - /** Absolute position of each recipient. */ - private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1); - /** * @param rcpt ID of the recipient. */ @@ -3059,6 +3082,47 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** + * + */ + private class OffheapIteratorClosure + extends CX2<T2<Long, Integer>, T2<Long, Integer>, IgniteBiTuple<K, V>> { + /** */ + private static final long serialVersionUID = 7410163202728985912L; + + /** */ + private IgniteBiPredicate<K, V> filter; + + /** */ + private boolean keepPortable; + + /** + * @param filter Filter. + * @param keepPortable Keep portable flag. + */ + private OffheapIteratorClosure( + @Nullable IgniteBiPredicate<K, V> filter, + boolean keepPortable) { + assert filter != null; + + this.filter = filter; + this.keepPortable = keepPortable; + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteBiTuple<K, V> applyx(T2<Long, Integer> keyPtr, + T2<Long, Integer> valPtr) + throws IgniteCheckedException { + LazyOffheapEntry e = new LazyOffheapEntry(keyPtr, valPtr); + + K key = (K)cctx.unwrapPortableIfNeeded(e.key(), keepPortable); + V val = (V)cctx.unwrapPortableIfNeeded(e.value(), keepPortable); + + if (!filter.apply(key, val)) + return null; + + return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value())); + } + } /** * Creates user's SQL fields query for given clause. For more information refer to {@link CacheQuery} * documentation. * @@ -3078,4 +3142,4 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte false, keepPortable); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlIndexMetadata.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlIndexMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlIndexMetadata.java index 6b3ed68..539a156 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlIndexMetadata.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlIndexMetadata.java @@ -51,9 +51,14 @@ public interface GridCacheSqlIndexMetadata extends Externalizable { public boolean descending(String field); /** + * @return Descendings. + */ + public Collection<String> descendings(); + + /** * Gets whether this is a unique index. * * @return {@code True} if index is unique, {@code false} otherwise. */ public boolean unique(); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlMetadata.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlMetadata.java index dae034c..724962e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlMetadata.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlMetadata.java @@ -78,6 +78,26 @@ public interface GridCacheSqlMetadata extends Externalizable { @Nullable public Map<String, String> fields(String type); /** + * @return Key classes. + */ + public Map<String, String> keyClasses(); + + /** + * @return Value classes. + */ + public Map<String, String> valClasses(); + + /** + * @return Fields. + */ + public Map<String, Map<String, String>> fields(); + + /** + * @return Indexes. + */ + public Map<String, Collection<GridCacheSqlIndexMetadata>> indexes(); + + /** * Gets descriptors of indexes created for provided type. * See {@link GridCacheSqlIndexMetadata} javadoc for more information. * @@ -86,4 +106,4 @@ public interface GridCacheSqlMetadata extends Externalizable { * @see GridCacheSqlIndexMetadata */ public Collection<GridCacheSqlIndexMetadata> indexes(String type); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java index 4f9b3ae..8282d3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestCommand.java @@ -96,6 +96,9 @@ public enum GridRestCommand { /** Cache size. */ CACHE_SIZE("size"), + /** Cache metadata. */ + CACHE_METADATA("metadata"), + /** Increment. */ ATOMIC_INCREMENT("incr"), @@ -141,6 +144,9 @@ public enum GridRestCommand { /** Execute sql fields query. */ EXECUTE_SQL_FIELDS_QUERY("qryfldexe"), + /** Execute scan query. */ + EXECUTE_SCAN_QUERY("qryscanexe"), + /** Fetch query results. */ FETCH_SQL_QUERY("qryfetch"), @@ -193,4 +199,4 @@ public enum GridRestCommand { public String key() { return key; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java index d54c8bb..df79232 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/GridRestProcessor.java @@ -51,7 +51,7 @@ import org.apache.ignite.internal.processors.rest.protocols.tcp.GridTcpRestProto import org.apache.ignite.internal.processors.rest.request.GridRestCacheRequest; import org.apache.ignite.internal.processors.rest.request.GridRestRequest; import org.apache.ignite.internal.processors.rest.request.GridRestTaskRequest; -import org.apache.ignite.internal.processors.rest.request.RestSqlQueryRequest; +import org.apache.ignite.internal.processors.rest.request.RestQueryRequest; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.util.GridSpinReadWriteLock; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -565,6 +565,53 @@ public class GridRestProcessor extends GridProcessorAdapter { } /** + * Applies {@link ConnectorMessageInterceptor} + * from {@link ConnectorConfiguration#getMessageInterceptor()} ()} + * to all user parameters in the request. + * + * @param req Client request. + */ + private void interceptRequest(GridRestRequest req) { + ConnectorMessageInterceptor interceptor = config().getMessageInterceptor(); + + if (interceptor == null) + return; + + if (req instanceof GridRestCacheRequest) { + GridRestCacheRequest req0 = (GridRestCacheRequest) req; + + req0.key(interceptor.onReceive(req0.key())); + req0.value(interceptor.onReceive(req0.value())); + req0.value2(interceptor.onReceive(req0.value2())); + + Map<Object, Object> oldVals = req0.values(); + + if (oldVals != null) { + Map<Object, Object> newVals = U.newHashMap(oldVals.size()); + + for (Map.Entry<Object, Object> e : oldVals.entrySet()) + newVals.put(interceptor.onReceive(e.getKey()), interceptor.onReceive(e.getValue())); + + req0.values(U.sealMap(newVals)); + } + } + else if (req instanceof GridRestTaskRequest) { + GridRestTaskRequest req0 = (GridRestTaskRequest) req; + + List<Object> oldParams = req0.params(); + + if (oldParams != null) { + Collection<Object> newParams = new ArrayList<>(oldParams.size()); + + for (Object o : oldParams) + newParams.add(interceptor.onReceive(o)); + + req0.params(U.sealList(newParams)); + } + } + } + + /** * Applies {@link ConnectorMessageInterceptor} from * {@link ConnectorConfiguration#getMessageInterceptor()} * to all user objects in the response. @@ -609,9 +656,7 @@ public class GridRestProcessor extends GridProcessorAdapter { break; } } - } - - /** + } /** * Applies interceptor to a response object. * Specially handler {@link Map} and {@link Collection} responses. * @@ -715,10 +760,11 @@ public class GridRestProcessor extends GridProcessorAdapter { case EXECUTE_SQL_QUERY: case EXECUTE_SQL_FIELDS_QUERY: + case EXECUTE_SCAN_QUERY: case CLOSE_SQL_QUERY: case FETCH_SQL_QUERY: perm = SecurityPermission.CACHE_READ; - name = ((RestSqlQueryRequest)req).cacheName(); + name = ((RestQueryRequest)req).cacheName(); break; @@ -764,6 +810,7 @@ public class GridRestProcessor extends GridProcessorAdapter { case CACHE_METRICS: case CACHE_SIZE: + case CACHE_METADATA: case TOPOLOGY: case NODE: case VERSION: @@ -884,15 +931,13 @@ public class GridRestProcessor extends GridProcessorAdapter { /** Session token id. */ private final UUID sesId; - - /** Security context. */ - private volatile SecurityContext secCtx; - /** * Time when session is used last time. * If this time was set at TIMEDOUT_FLAG, then it should never be changed. */ private final AtomicLong lastTouchTime = new AtomicLong(U.currentTimeMillis()); + /** Security context. */ + private volatile SecurityContext secCtx; /** * @param clientId Client ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java index 9d32c17..1bbc754 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.apache.ignite.internal.processors.cache.query.GridCacheSqlMetadata; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.rest.GridRestCommand; import org.apache.ignite.internal.processors.rest.GridRestResponse; @@ -73,6 +74,7 @@ import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_G import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_GET_AND_PUT_IF_ABSENT; import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_GET_AND_REMOVE; import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_GET_AND_REPLACE; +import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_METADATA; import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_METRICS; import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_PREPEND; import static org.apache.ignite.internal.processors.rest.GridRestCommand.CACHE_PUT; @@ -119,7 +121,8 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { CACHE_APPEND, CACHE_PREPEND, CACHE_METRICS, - CACHE_SIZE + CACHE_SIZE, + CACHE_METADATA ); /** Requests with required parameter {@code key}. */ @@ -151,11 +154,6 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { super(ctx); } - /** {@inheritDoc} */ - @Override public Collection<GridRestCommand> supportedCommands() { - return SUPPORTED_COMMANDS; - } - /** * Retrieves cache flags from corresponding bits. * @@ -172,6 +170,153 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { return false; } + /** + * Handles append and prepend commands. + * + * @param ctx Kernal context. + * @param cache Cache. + * @param key Key. + * @param req Request. + * @param prepend Whether to prepend. + * @return Future of operation result. + * @throws IgniteCheckedException In case of any exception. + */ + private static IgniteInternalFuture<?> appendOrPrepend( + final GridKernalContext ctx, + final IgniteInternalCache<Object, Object> cache, + final Object key, GridRestCacheRequest req, final boolean prepend) throws IgniteCheckedException { + assert cache != null; + assert key != null; + assert req != null; + + final Object val = req.value(); + + if (val == null) + throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("val")); + + return ctx.closure().callLocalSafe(new Callable<Object>() { + @Override public Object call() throws Exception { + try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { + Object curVal = cache.get(key); + + if (curVal == null) + return false; + + // Modify current value with appendix one. + Object newVal = appendOrPrepend(curVal, val, !prepend); + + // Put new value asynchronously. + cache.put(key, newVal); + + tx.commit(); + } + + return true; + } + }, false); + } + + /** + * Append or prepend new value to the current one. + * + * @param origVal Original value. + * @param appendVal Appendix value to add to the original one. + * @param appendPlc Append or prepend policy flag. + * @return Resulting value. + * @throws IgniteCheckedException In case of grid exceptions. + */ + private static Object appendOrPrepend(Object origVal, Object appendVal, boolean appendPlc) throws IgniteCheckedException { + // Strings. + if (appendVal instanceof String && origVal instanceof String) + return appendPlc ? origVal + (String)appendVal : (String)appendVal + origVal; + + // Maps. + if (appendVal instanceof Map && origVal instanceof Map) { + Map<Object, Object> origMap = (Map<Object, Object>)origVal; + Map<Object, Object> appendMap = (Map<Object, Object>)appendVal; + + Map<Object, Object> map = X.cloneObject(origMap, false, true); + + if (appendPlc) + map.putAll(appendMap); // Append. + else { + map.clear(); + map.putAll(appendMap); // Prepend. + map.putAll(origMap); + } + + for (Map.Entry<Object, Object> e : appendMap.entrySet()) // Remove zero-valued entries. + if (e.getValue() == null && map.get(e.getKey()) == null) + map.remove(e.getKey()); + + return map; + } + + // Generic collection. + if (appendVal instanceof Collection<?> && origVal instanceof Collection<?>) { + Collection<Object> origCol = (Collection<Object>)origVal; + Collection<Object> appendCol = (Collection<Object>)appendVal; + + Collection<Object> col = X.cloneObject(origCol, false, true); + + if (appendPlc) + col.addAll(appendCol); // Append. + else { + col.clear(); + col.addAll(appendCol); // Prepend. + col.addAll(origCol); + } + + return col; + } + + throw new IgniteCheckedException("Incompatible types [appendVal=" + appendVal + ", old=" + origVal + ']'); + } + + /** + * Creates a transformation function from {@link CacheCommand}'s results into {@link GridRestResponse}. + * + * @param c Cache instance to obtain affinity data. + * @param key Affinity key for previous operation. + * @return Rest response. + */ + private static IgniteClosure<IgniteInternalFuture<?>, GridRestResponse> resultWrapper( + final IgniteInternalCache<Object, Object> c, @Nullable final Object key) { + return new CX1<IgniteInternalFuture<?>, GridRestResponse>() { + @Override public GridRestResponse applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException { + GridCacheRestResponse resp = new GridCacheRestResponse(); + + resp.setResponse(f.get()); + + if (key != null) + resp.setAffinityNodeId(c.cache().affinity().mapKeyToNode(key).id().toString()); + + return resp; + } + }; + } + + /** + * @param ignite Grid instance. + * @param cacheName Name of the cache. + * @return Instance on the named cache. + * @throws IgniteCheckedException If cache not found. + */ + private static IgniteInternalCache<Object, Object> cache(Ignite ignite, String cacheName) throws IgniteCheckedException { + IgniteInternalCache<Object, Object> cache = ((IgniteKernal)ignite).getCache(cacheName); + + if (cache == null) + throw new IgniteCheckedException( + "Failed to find cache for given cache name (null for default cache): " + cacheName); + + return cache; + } + + /** {@inheritDoc} */ + @Override public Collection<GridRestCommand> supportedCommands() { + return SUPPORTED_COMMANDS; + } + /** {@inheritDoc} */ @Override public IgniteInternalFuture<GridRestResponse> handleAsync(final GridRestRequest req) { assert req instanceof GridRestCacheRequest : "Invalid command for topology handler: " + req; @@ -224,6 +369,25 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { break; } + case CACHE_METADATA: { + IgniteInternalCache<?, ?> cache = ctx.cache().cache(cacheName); + + if (cache != null) { + GridCacheSqlMetadata res = F.first(cache.context().queries().sqlMetadata()); + + fut = new GridFinishedFuture<>(new GridRestResponse(res)); + } + else { + ClusterGroup prj = ctx.grid().cluster().forDataNodes(cacheName); + + ctx.task().setThreadContext(TC_NO_FAILOVER, true); + + fut = ctx.closure().callAsync(BALANCE, new MetadataCommand(cacheName), prj.nodes()); + } + + break; + } + case CACHE_CONTAINS_KEYS: { fut = executeCommand(req.destinationId(), req.clientId(), cacheName, skipStore, key, new ContainsKeysCommand(getKeys(req0))); @@ -545,138 +709,12 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { } } - /** - * Handles append and prepend commands. - * - * @param ctx Kernal context. - * @param cache Cache. - * @param key Key. - * @param req Request. - * @param prepend Whether to prepend. - * @return Future of operation result. - * @throws IgniteCheckedException In case of any exception. - */ - private static IgniteInternalFuture<?> appendOrPrepend( - final GridKernalContext ctx, - final IgniteInternalCache<Object, Object> cache, - final Object key, GridRestCacheRequest req, final boolean prepend) throws IgniteCheckedException { - assert cache != null; - assert key != null; - assert req != null; - - final Object val = req.value(); - - if (val == null) - throw new IgniteCheckedException(GridRestCommandHandlerAdapter.missingParameter("val")); - - return ctx.closure().callLocalSafe(new Callable<Object>() { - @Override public Object call() throws Exception { - try (IgniteInternalTx tx = cache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { - Object curVal = cache.get(key); - - if (curVal == null) - return false; - - // Modify current value with appendix one. - Object newVal = appendOrPrepend(curVal, val, !prepend); - - // Put new value asynchronously. - cache.put(key, newVal); - - tx.commit(); - } - - return true; - } - }, false); - } - - /** - * Append or prepend new value to the current one. - * - * @param origVal Original value. - * @param appendVal Appendix value to add to the original one. - * @param appendPlc Append or prepend policy flag. - * @return Resulting value. - * @throws IgniteCheckedException In case of grid exceptions. - */ - private static Object appendOrPrepend(Object origVal, Object appendVal, boolean appendPlc) throws IgniteCheckedException { - // Strings. - if (appendVal instanceof String && origVal instanceof String) - return appendPlc ? origVal + (String)appendVal : (String)appendVal + origVal; - - // Maps. - if (appendVal instanceof Map && origVal instanceof Map) { - Map<Object, Object> origMap = (Map<Object, Object>)origVal; - Map<Object, Object> appendMap = (Map<Object, Object>)appendVal; - - Map<Object, Object> map = X.cloneObject(origMap, false, true); - - if (appendPlc) - map.putAll(appendMap); // Append. - else { - map.clear(); - map.putAll(appendMap); // Prepend. - map.putAll(origMap); - } - - for (Map.Entry<Object, Object> e : appendMap.entrySet()) // Remove zero-valued entries. - if (e.getValue() == null && map.get(e.getKey()) == null) - map.remove(e.getKey()); - - return map; - } - - // Generic collection. - if (appendVal instanceof Collection<?> && origVal instanceof Collection<?>) { - Collection<Object> origCol = (Collection<Object>)origVal; - Collection<Object> appendCol = (Collection<Object>)appendVal; - - Collection<Object> col = X.cloneObject(origCol, false, true); - - if (appendPlc) - col.addAll(appendCol); // Append. - else { - col.clear(); - col.addAll(appendCol); // Prepend. - col.addAll(origCol); - } - - return col; - } - - throw new IgniteCheckedException("Incompatible types [appendVal=" + appendVal + ", old=" + origVal + ']'); - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheCommandHandler.class, this); } /** - * Creates a transformation function from {@link CacheCommand}'s results into {@link GridRestResponse}. - * - * @param c Cache instance to obtain affinity data. - * @param key Affinity key for previous operation. - * @return Rest response. - */ - private static IgniteClosure<IgniteInternalFuture<?>, GridRestResponse> resultWrapper( - final IgniteInternalCache<Object, Object> c, @Nullable final Object key) { - return new CX1<IgniteInternalFuture<?>, GridRestResponse>() { - @Override public GridRestResponse applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException { - GridCacheRestResponse resp = new GridCacheRestResponse(); - - resp.setResponse(f.get()); - - if (key != null) - resp.setAffinityNodeId(c.cache().affinity().mapKeyToNode(key).id().toString()); - - return resp; - } - }; - } - - /** * @param cacheName Cache name. * @return If replicated cache with given name is locally available. */ @@ -702,22 +740,6 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { } /** - * @param ignite Grid instance. - * @param cacheName Name of the cache. - * @return Instance on the named cache. - * @throws IgniteCheckedException If cache not found. - */ - private static IgniteInternalCache<Object, Object> cache(Ignite ignite, String cacheName) throws IgniteCheckedException { - IgniteInternalCache<Object, Object> cache = ((IgniteKernal)ignite).getCache(cacheName); - - if (cache == null) - throw new IgniteCheckedException( - "Failed to find cache for given cache name (null for default cache): " + cacheName); - - return cache; - } - - /** * Fixed result closure. */ private static final class FixedResult extends CX1<IgniteInternalFuture<?>, Object> { @@ -771,22 +793,16 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { private static class FlaggedCacheOperationCallable implements Callable<GridRestResponse>, Serializable { /** */ private static final long serialVersionUID = 0L; - - /** Client ID. */ - private UUID clientId; - /** */ private final String cacheName; - /** */ private final boolean skipStore; - /** */ private final CacheProjectionCommand op; - /** */ private final Object key; - + /** Client ID. */ + private UUID clientId; /** */ @IgniteInstanceResource private Ignite g; @@ -829,19 +845,14 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { private static class CacheOperationCallable implements Callable<GridRestResponse>, Serializable { /** */ private static final long serialVersionUID = 0L; - - /** Client ID. */ - private UUID clientId; - /** */ private final String cacheName; - /** */ private final CacheCommand op; - /** */ private final Object key; - + /** Client ID. */ + private UUID clientId; /** */ @IgniteInstanceResource private Ignite g; @@ -892,6 +903,31 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter { } /** */ + private static class MetadataCommand implements Callable<GridRestResponse>, Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private final String cacheName; + + /** */ + @IgniteInstanceResource + private Ignite g; + + /** + * @param cacheName Cache name. + */ + private MetadataCommand(String cacheName) { + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public GridRestResponse call() throws Exception { + return new GridRestResponse(F.first(cache(g, cacheName).context().queries().sqlMetadata())); + } + } + + /** */ private static class ContainsKeysCommand extends CacheProjectionCommand { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/ignite/blob/12235254/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java index 64c7673..f4ddd59 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/query/QueryCommandHandler.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.rest.handlers.query; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -25,8 +27,10 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; import org.apache.ignite.cache.query.Query; import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.internal.GridKernalContext; @@ -37,12 +41,14 @@ import org.apache.ignite.internal.processors.rest.GridRestCommand; import org.apache.ignite.internal.processors.rest.GridRestResponse; import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandlerAdapter; import org.apache.ignite.internal.processors.rest.request.GridRestRequest; -import org.apache.ignite.internal.processors.rest.request.RestSqlQueryRequest; +import org.apache.ignite.internal.processors.rest.request.RestQueryRequest; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import static org.apache.ignite.internal.processors.rest.GridRestCommand.CLOSE_SQL_QUERY; +import static org.apache.ignite.internal.processors.rest.GridRestCommand.EXECUTE_SCAN_QUERY; import static org.apache.ignite.internal.processors.rest.GridRestCommand.EXECUTE_SQL_FIELDS_QUERY; import static org.apache.ignite.internal.processors.rest.GridRestCommand.EXECUTE_SQL_QUERY; import static org.apache.ignite.internal.processors.rest.GridRestCommand.FETCH_SQL_QUERY; @@ -54,6 +60,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { /** Supported commands. */ private static final Collection<GridRestCommand> SUPPORTED_COMMANDS = U.sealList(EXECUTE_SQL_QUERY, EXECUTE_SQL_FIELDS_QUERY, + EXECUTE_SCAN_QUERY, FETCH_SQL_QUERY, CLOSE_SQL_QUERY); @@ -70,6 +77,76 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { super(ctx); } + /** + * @param qryCurs Query cursors. + * @param cur Current cursor. + * @param req Sql request. + * @param qryId Query id. + * @return Query result with items. + */ + private static CacheQueryResult createQueryResult( + ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs, + Iterator cur, RestQueryRequest req, Long qryId) { + CacheQueryResult res = new CacheQueryResult(); + + List<Object> items = new ArrayList<>(); + + for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i) + items.add(cur.next()); + + res.setItems(items); + + res.setLast(!cur.hasNext()); + + res.setQueryId(qryId); + + if (!cur.hasNext()) + qryCurs.remove(qryId); + + return res; + } + + /** + * Creates class instance. + * + * @param cls Target class. + * @param clsName Implementing class name. + * @return Class instance. + * @throws IgniteException If failed. + */ + private static <T> T instance(Class<? extends T> cls, String clsName) throws IgniteException { + try { + Class<?> implCls = Class.forName(clsName); + + if (!cls.isAssignableFrom(implCls)) + throw new IgniteException("Failed to create instance (target class does not extend or implement " + + "required class or interface) [cls=" + cls.getName() + ", clsName=" + clsName + ']'); + + Constructor<?> ctor = implCls.getConstructor(); + + return (T)ctor.newInstance(); + } + catch (ClassNotFoundException e) { + throw new IgniteException("Failed to find target class: " + clsName, e); + } + catch (NoSuchMethodException e) { + throw new IgniteException("Failed to find constructor for provided arguments " + + "[clsName=" + clsName + ']', e); + } + catch (InstantiationException e) { + throw new IgniteException("Failed to instantiate target class " + + "[clsName=" + clsName + ']', e); + } + catch (IllegalAccessException e) { + throw new IgniteException("Failed to instantiate class (constructor is not available) " + + "[clsName=" + clsName + ']', e); + } + catch (InvocationTargetException e) { + throw new IgniteException("Failed to instantiate class (constructor threw an exception) " + + "[clsName=" + clsName + ']', e.getCause()); + } + } + /** {@inheritDoc} */ @Override public Collection<GridRestCommand> supportedCommands() { return SUPPORTED_COMMANDS; @@ -80,23 +157,24 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { assert req != null; assert SUPPORTED_COMMANDS.contains(req.command()); - assert req instanceof RestSqlQueryRequest : "Invalid type of query request."; + assert req instanceof RestQueryRequest : "Invalid type of query request."; switch (req.command()) { case EXECUTE_SQL_QUERY: - case EXECUTE_SQL_FIELDS_QUERY: { + case EXECUTE_SQL_FIELDS_QUERY: + case EXECUTE_SCAN_QUERY: { return ctx.closure().callLocalSafe( - new ExecuteQueryCallable(ctx, (RestSqlQueryRequest)req, qryCurs), false); + new ExecuteQueryCallable(ctx, (RestQueryRequest)req, qryCurs), false); } case FETCH_SQL_QUERY: { return ctx.closure().callLocalSafe( - new FetchQueryCallable((RestSqlQueryRequest)req, qryCurs), false); + new FetchQueryCallable(ctx, (RestQueryRequest)req, qryCurs), false); } case CLOSE_SQL_QUERY: { return ctx.closure().callLocalSafe( - new CloseQueryCallable((RestSqlQueryRequest)req, qryCurs), false); + new CloseQueryCallable((RestQueryRequest)req, qryCurs), false); } } @@ -111,7 +189,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { private GridKernalContext ctx; /** Execute query request. */ - private RestSqlQueryRequest req; + private RestQueryRequest req; /** Queries cursors. */ private ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs; @@ -121,7 +199,7 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { * @param req Execute query request. * @param qryCurs Queries cursors. */ - public ExecuteQueryCallable(GridKernalContext ctx, RestSqlQueryRequest req, + public ExecuteQueryCallable(GridKernalContext ctx, RestQueryRequest req, ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) { this.ctx = ctx; this.req = req; @@ -135,15 +213,33 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { try { Query qry; - if (req.typeName() != null) { - qry = new SqlQuery(req.typeName(), req.sqlQuery()); + switch (req.queryType()) { + case SQL: + qry = new SqlQuery(req.typeName(), req.sqlQuery()); - ((SqlQuery)qry).setArgs(req.arguments()); - } - else { - qry = new SqlFieldsQuery(req.sqlQuery()); + ((SqlQuery)qry).setArgs(req.arguments()); + + break; + + case SQL_FIELDS: + qry = new SqlFieldsQuery(req.sqlQuery()); + + ((SqlFieldsQuery)qry).setArgs(req.arguments()); - ((SqlFieldsQuery)qry).setArgs(req.arguments()); + break; + + case SCAN: + IgniteBiPredicate pred = null; + + if (req.className() != null) + pred = instance(IgniteBiPredicate.class, req.className()); + + qry = new ScanQuery(pred); + + break; + + default: + throw new IgniteException("Incorrect query type [type=" + req.queryType() + "]"); } IgniteCache<Object, Object> cache = ctx.grid().cache(req.cacheName()); @@ -160,9 +256,25 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { CacheQueryResult res = createQueryResult(qryCurs, cur, req, qryId); - List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl<?>) qryCur).fieldsMeta(); + switch (req.queryType()) { + case SQL: + case SQL_FIELDS: + List<GridQueryFieldMetadata> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta(); + + res.setFieldsMetadata(convertMetadata(fieldsMeta)); - res.setFieldsMetadata(convertMetadata(fieldsMeta)); + break; + case SCAN: + CacheQueryFieldsMetaResult keyField = new CacheQueryFieldsMetaResult(); + keyField.setFieldName("key"); + + CacheQueryFieldsMetaResult valField = new CacheQueryFieldsMetaResult(); + valField.setFieldName("value"); + + res.setFieldsMetadata(U.sealList(keyField, valField)); + + break; + } return new GridRestResponse(res); } @@ -193,17 +305,16 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { * Close query callable. */ private static class CloseQueryCallable implements Callable<GridRestResponse> { - /** Execute query request. */ - private RestSqlQueryRequest req; - /** Queries cursors. */ private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs; + /** Execute query request. */ + private RestQueryRequest req; /** * @param req Execute query request. * @param qryCurs Queries cursors. */ - public CloseQueryCallable(RestSqlQueryRequest req, + public CloseQueryCallable(RestQueryRequest req, ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) { this.req = req; this.qryCurs = qryCurs; @@ -236,18 +347,21 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { * Fetch query callable. */ private static class FetchQueryCallable implements Callable<GridRestResponse> { - /** Execute query request. */ - private RestSqlQueryRequest req; - /** Queries cursors. */ private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs; + /** Grid kernal context. */ + private final GridKernalContext ctx; + /** Execute query request. */ + private RestQueryRequest req; /** + * @param ctx Grid kernal context. * @param req Execute query request. * @param qryCurs Queries cursors. */ - public FetchQueryCallable(RestSqlQueryRequest req, + public FetchQueryCallable(GridKernalContext ctx, RestQueryRequest req, ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs) { + this.ctx = ctx; this.req = req; this.qryCurs = qryCurs; } @@ -272,33 +386,4 @@ public class QueryCommandHandler extends GridRestCommandHandlerAdapter { } } } - - /** - * @param qryCurs Query cursors. - * @param cur Current cursor. - * @param req Sql request. - * @param qryId Query id. - * @return Query result with items. - */ - private static CacheQueryResult createQueryResult( - ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs, - Iterator cur, RestSqlQueryRequest req, Long qryId) { - CacheQueryResult res = new CacheQueryResult(); - - List<Object> items = new ArrayList<>(); - - for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i) - items.add(cur.next()); - - res.setItems(items); - - res.setLast(!cur.hasNext()); - - res.setQueryId(qryId); - - if (!cur.hasNext()) - qryCurs.remove(qryId); - - return res; - } -} \ No newline at end of file +}