ignite-sql - all
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8dcf8954 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8dcf8954 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8dcf8954 Branch: refs/heads/ignite-sql Commit: 8dcf8954f911de7cf6dff12c3c8d42c98d756760 Parents: 9376bf9 Author: S.Vladykin <[email protected]> Authored: Thu Feb 5 00:54:21 2015 +0300 Committer: S.Vladykin <[email protected]> Committed: Thu Feb 5 00:54:21 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 145 ++++++++++++------- .../processors/query/GridQueryIndexing.java | 7 + .../processors/query/GridQueryProcessor.java | 77 ++++++++++ .../processors/query/h2/IgniteH2Indexing.java | 23 ++- .../query/h2/twostep/GridMapQueryExecutor.java | 18 +-- 5 files changed, 197 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8dcf8954/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 56dd6e1..87fe0b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -20,11 +20,10 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.mxbean.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; @@ -185,7 +184,7 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach try { if (isAsync()) - setFuture(delegate.<K, V>cache().loadCacheAsync(p, 0, args)); + setFuture(delegate.<K,V>cache().loadCacheAsync(p, 0, args)); else delegate.<K, V>cache().loadCache(p, 0, args); } @@ -243,66 +242,84 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach } } - /** {@inheritDoc} */ + /** + * @param filter Filter. + * @param grp Optional cluster group. + * @return Cursor. + */ @SuppressWarnings("unchecked") - @Override public QueryCursor<Entry<K,V>> query(QueryPredicate filter) { - A.notNull(filter, "filter"); + private QueryCursor<Entry<K,V>> query(QueryPredicate filter, @Nullable ClusterGroup grp) { + final CacheQuery<Map.Entry<K,V>> qry; + final CacheQueryFuture<Map.Entry<K,V>> fut; - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + if (filter instanceof QueryScanPredicate) { + qry = delegate.queries().createScanQuery((IgniteBiPredicate<K,V>)filter); - try { - if (filter instanceof QuerySqlPredicate) { - QuerySqlPredicate p = (QuerySqlPredicate)filter; + if (grp != null) + qry.projection(grp); - return ctx.kernalContext().query().queryTwoStep(ctx.name(), p.getType(), p.getSql(), p.getArgs()); - } + fut = qry.execute(); + } + else if (filter instanceof QueryTextPredicate) { + QueryTextPredicate p = (QueryTextPredicate)filter; - final CacheQuery<Map.Entry<K,V>> qry; - final CacheQueryFuture<Map.Entry<K,V>> fut; + qry = delegate.queries().createFullTextQuery(p.getType(), p.getText()); - if (filter instanceof QueryScanPredicate) { - qry = delegate.queries().createScanQuery((IgniteBiPredicate<K,V>)filter); + if (grp != null) + qry.projection(grp); - fut = qry.execute(); - } - else if (filter instanceof QueryTextPredicate) { - QueryTextPredicate p = (QueryTextPredicate)filter; + fut = qry.execute(); + } + else if (filter instanceof QuerySpiPredicate) { + qry = ((GridCacheQueriesEx)delegate.queries()).createSpiQuery(); - qry = delegate.queries().createFullTextQuery(p.getType(), p.getText()); + if (grp != null) + qry.projection(grp); - fut = qry.execute(); - } - else if (filter instanceof QuerySpiPredicate) { - qry = ((GridCacheQueriesEx)delegate.queries()).createSpiQuery(); + fut = qry.execute(((QuerySpiPredicate)filter).getArgs()); + } + else + throw new IgniteException("Unsupported query predicate: " + filter); + + return new QueryCursorImpl<>(new GridCloseableIteratorAdapter<Entry<K,V>>() { + /** */ + Map.Entry<K,V> cur; + + @Override protected Entry<K,V> onNext() throws IgniteCheckedException { + if (!onHasNext()) + throw new NoSuchElementException(); - fut = qry.execute(((QuerySpiPredicate)filter).getArgs()); + Map.Entry<K,V> e = cur; + + cur = null; + + return new CacheEntryImpl<>(e.getKey(), e.getValue()); } - else - throw new IgniteException("Unsupported query predicate: " + filter); - return new QueryCursorImpl<>(new GridCloseableIteratorAdapter<Entry<K,V>>() { - /** */ - Map.Entry<K,V> cur; + @Override protected boolean onHasNext() throws IgniteCheckedException { + return cur != null || (cur = fut.next()) != null; + } - @Override protected Entry<K,V> onNext() throws IgniteCheckedException { - if (!onHasNext()) - throw new NoSuchElementException(); + @Override protected void onClose() throws IgniteCheckedException { + fut.cancel(); + } + }); + } - Map.Entry<K,V> e = cur; + /** {@inheritDoc} */ + @Override public QueryCursor<Entry<K,V>> query(QueryPredicate filter) { + A.notNull(filter, "filter"); - cur = null; + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - return new CacheEntryImpl<>(e.getKey(), e.getValue()); - } + try { + if (filter instanceof QuerySqlPredicate) { + QuerySqlPredicate p = (QuerySqlPredicate)filter; - @Override protected boolean onHasNext() throws IgniteCheckedException { - return cur != null || (cur = fut.next()) != null; - } + return ctx.kernalContext().query().queryTwoStep(ctx.name(), p.getType(), p.getSql(), p.getArgs()); + } - @Override protected void onClose() throws IgniteCheckedException { - fut.cancel(); - } - }); + return query(filter, null); } finally { gate.leave(prev); @@ -313,24 +330,50 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter<IgniteCach @Override public QueryCursor<List<?>> queryFields(QuerySqlPredicate filter) { A.notNull(filter, "filter"); - // TODO implement - return null; + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return ctx.kernalContext().query().queryTwoStep(ctx.name(), filter.getSql(), filter.getArgs()); + } + finally { + gate.leave(prev); + } } /** {@inheritDoc} */ @Override public QueryCursor<Entry<K,V>> localQuery(QueryPredicate filter) { A.notNull(filter, "filter"); - // TODO implement - return null; + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + if (filter instanceof QuerySqlPredicate) { + QuerySqlPredicate p = (QuerySqlPredicate)filter; + + return new QueryCursorImpl<>(ctx.kernalContext().query().<K,V>queryLocal( + ctx.name(), p.getType(), p.getSql(), p.getArgs())); + } + + return query(filter, ctx.kernalContext().grid().forLocal()); + } + finally { + gate.leave(prev); + } } /** {@inheritDoc} */ @Override public QueryCursor<List<?>> localQueryFields(QuerySqlPredicate filter) { A.notNull(filter, "filter"); - // TODO implement - return null; + GridCacheProjectionImpl<K, V> prev = gate.enter(prj); + + try { + return new QueryCursorImpl<>(ctx.kernalContext().query().queryLocalFields( + ctx.name(), filter.getSql(), filter.getArgs())); + } + finally { + gate.leave(prev); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8dcf8954/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java index 5a5d09a..5a7e1d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java @@ -201,4 +201,11 @@ public interface GridQueryIndexing { * @param type Type descriptor. */ public void rebuildIndexes(@Nullable String spaceName, GridQueryTypeDescriptor type); + + /** + * Returns backup filter. + * + * @return Backup filter. + */ + public IndexingQueryFilter backupFilter(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8dcf8954/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 903c4e1..3a21586 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -24,6 +24,7 @@ import org.apache.ignite.cache.query.annotations.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; @@ -494,6 +495,82 @@ public class GridQueryProcessor extends GridProcessorAdapter { /** * @param space Space. + * @param type Type. + * @param sqlQry Query. + * @param params Parameters. + * @return Cursor. + */ + public <K,V> Iterator<Cache.Entry<K,V>> queryLocal(String space, String type, String sqlQry, Object[] params) { + if (!busyLock.enterBusy()) + throw new IllegalStateException("Failed to execute query (grid is stopping)."); + + try { + TypeDescriptor typeDesc = typesByName.get(new TypeName(space, type)); + + if (typeDesc == null || !typeDesc.registered()) + return new GridEmptyCloseableIterator<>(); + + final GridCloseableIterator<IgniteBiTuple<K,V>> i = idx.query(space, sqlQry, F.asList(params), typeDesc, + idx.backupFilter()); + + return new ClIter<Cache.Entry<K,V>>() { + @Override public void close() throws Exception { + i.close(); + } + + @Override public boolean hasNext() { + return i.hasNext(); + } + + @Override public Cache.Entry<K,V> next() { + IgniteBiTuple<K,V> t = i.next(); + + return new CacheEntryImpl<>(t.getKey(), t.getValue()); + } + + @Override public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * Closeable iterator. + */ + private static interface ClIter<X> extends AutoCloseable, Iterator<X> { + // No-op. + } + + /** + * @param space Space. + * @param sql SQL Query. + * @param args Arguments. + * @return Iterator. + */ + public Iterator<List<?>> queryLocalFields(String space, String sql, Object[] args) { + if (!busyLock.enterBusy()) + throw new IllegalStateException("Failed to execute query (grid is stopping)."); + + try { + return idx.queryFields(space, sql, F.asList(args), idx.backupFilter()).iterator(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * @param space Space. * @param key Key. * @throws IgniteCheckedException Thrown in case of any errors. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8dcf8954/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index be0d2d0..1169fcf 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -34,11 +34,6 @@ import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.offheap.unsafe.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.marshaller.optimized.*; @@ -1440,6 +1435,24 @@ public class IgniteH2Indexing implements GridQueryIndexing { schemaNames.add(schema(spaceName)); } + /** {@inheritDoc} */ + @Override public IndexingQueryFilter backupFilter() { + return new IndexingQueryFilter() { + @Nullable @Override public <K, V> IgniteBiPredicate<K, V> forSpace(String spaceName) { + final GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(spaceName); + + if (cache.context().isReplicated() || cache.configuration().getBackups() == 0) + return null; + + return new IgniteBiPredicate<K, V>() { + @Override public boolean apply(K k, V v) { + return cache.context().affinity().primary(ctx.discovery().localNode(), k, -1); + } + }; + } + }; + } + /** * Wrapper to store connection and flag is schema set or not. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8dcf8954/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 12a5b83..01483b2 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -12,18 +12,15 @@ package org.apache.ignite.internal.processors.query.h2.twostep; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.query.*; import org.apache.ignite.internal.processors.query.h2.*; import org.apache.ignite.internal.processors.query.h2.twostep.messages.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.lang.*; -import org.apache.ignite.spi.indexing.*; import org.h2.jdbc.*; import org.h2.result.*; import org.h2.value.*; import org.jdk8.backport.*; -import org.jetbrains.annotations.*; import java.lang.reflect.*; import java.sql.*; @@ -97,20 +94,7 @@ public class GridMapQueryExecutor { * @param req Query request. */ private void executeLocalQuery(ClusterNode node, GridQueryRequest req) { - h2.setFilters(new IndexingQueryFilter() { - @Nullable @Override public <K, V> IgniteBiPredicate<K, V> forSpace(String spaceName) { - final GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(spaceName); - - if (cache.context().isReplicated() || cache.configuration().getBackups() == 0) - return null; - - return new IgniteBiPredicate<K, V>() { - @Override public boolean apply(K k, V v) { - return cache.context().affinity().primary(ctx.discovery().localNode(), k, -1); - } - }; - } - }); + h2.setFilters(h2.backupFilter()); try { QueryResults qr = new QueryResults(req.requestId(), req.queries().size());
