ignite-sql - all

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8dcf8954
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8dcf8954
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8dcf8954

Branch: refs/heads/ignite-sql
Commit: 8dcf8954f911de7cf6dff12c3c8d42c98d756760
Parents: 9376bf9
Author: S.Vladykin <[email protected]>
Authored: Thu Feb 5 00:54:21 2015 +0300
Committer: S.Vladykin <[email protected]>
Committed: Thu Feb 5 00:54:21 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      | 145 ++++++++++++-------
 .../processors/query/GridQueryIndexing.java     |   7 +
 .../processors/query/GridQueryProcessor.java    |  77 ++++++++++
 .../processors/query/h2/IgniteH2Indexing.java   |  23 ++-
 .../query/h2/twostep/GridMapQueryExecutor.java  |  18 +--
 5 files changed, 197 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8dcf8954/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 56dd6e1..87fe0b7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -20,11 +20,10 @@ package org.apache.ignite.internal.processors.cache;
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.util.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.mxbean.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
@@ -185,7 +184,7 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter<IgniteCach
 
             try {
                 if (isAsync())
-                    setFuture(delegate.<K, V>cache().loadCacheAsync(p, 0, 
args));
+                    setFuture(delegate.<K,V>cache().loadCacheAsync(p, 0, 
args));
                 else
                     delegate.<K, V>cache().loadCache(p, 0, args);
             }
@@ -243,66 +242,84 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter<IgniteCach
         }
     }
 
-    /** {@inheritDoc} */
+    /**
+     * @param filter Filter.
+     * @param grp Optional cluster group.
+     * @return Cursor.
+     */
     @SuppressWarnings("unchecked")
-    @Override public QueryCursor<Entry<K,V>> query(QueryPredicate filter) {
-        A.notNull(filter, "filter");
+    private QueryCursor<Entry<K,V>> query(QueryPredicate filter, @Nullable 
ClusterGroup grp) {
+        final CacheQuery<Map.Entry<K,V>> qry;
+        final CacheQueryFuture<Map.Entry<K,V>> fut;
 
-        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+        if (filter instanceof QueryScanPredicate) {
+            qry = 
delegate.queries().createScanQuery((IgniteBiPredicate<K,V>)filter);
 
-        try {
-            if (filter instanceof QuerySqlPredicate) {
-                QuerySqlPredicate p = (QuerySqlPredicate)filter;
+            if (grp != null)
+                qry.projection(grp);
 
-                return ctx.kernalContext().query().queryTwoStep(ctx.name(), 
p.getType(), p.getSql(), p.getArgs());
-            }
+            fut = qry.execute();
+        }
+        else if (filter instanceof QueryTextPredicate) {
+            QueryTextPredicate p = (QueryTextPredicate)filter;
 
-            final CacheQuery<Map.Entry<K,V>> qry;
-            final CacheQueryFuture<Map.Entry<K,V>> fut;
+            qry = delegate.queries().createFullTextQuery(p.getType(), 
p.getText());
 
-            if (filter instanceof QueryScanPredicate) {
-                qry = 
delegate.queries().createScanQuery((IgniteBiPredicate<K,V>)filter);
+            if (grp != null)
+                qry.projection(grp);
 
-                fut = qry.execute();
-            }
-            else if (filter instanceof QueryTextPredicate) {
-                QueryTextPredicate p = (QueryTextPredicate)filter;
+            fut = qry.execute();
+        }
+        else if (filter instanceof QuerySpiPredicate) {
+            qry = ((GridCacheQueriesEx)delegate.queries()).createSpiQuery();
 
-                qry = delegate.queries().createFullTextQuery(p.getType(), 
p.getText());
+            if (grp != null)
+                qry.projection(grp);
 
-                fut = qry.execute();
-            }
-            else if (filter instanceof QuerySpiPredicate) {
-                qry = 
((GridCacheQueriesEx)delegate.queries()).createSpiQuery();
+            fut = qry.execute(((QuerySpiPredicate)filter).getArgs());
+        }
+        else
+            throw new IgniteException("Unsupported query predicate: " + 
filter);
+
+        return new QueryCursorImpl<>(new 
GridCloseableIteratorAdapter<Entry<K,V>>() {
+            /** */
+            Map.Entry<K,V> cur;
+
+            @Override protected Entry<K,V> onNext() throws 
IgniteCheckedException {
+                if (!onHasNext())
+                    throw new NoSuchElementException();
 
-                fut = qry.execute(((QuerySpiPredicate)filter).getArgs());
+                Map.Entry<K,V> e = cur;
+
+                cur = null;
+
+                return new CacheEntryImpl<>(e.getKey(), e.getValue());
             }
-            else
-                throw new IgniteException("Unsupported query predicate: " + 
filter);
 
-            return new QueryCursorImpl<>(new 
GridCloseableIteratorAdapter<Entry<K,V>>() {
-                /** */
-                Map.Entry<K,V> cur;
+            @Override protected boolean onHasNext() throws 
IgniteCheckedException {
+                return cur != null || (cur = fut.next()) != null;
+            }
 
-                @Override protected Entry<K,V> onNext() throws 
IgniteCheckedException {
-                    if (!onHasNext())
-                        throw new NoSuchElementException();
+            @Override protected void onClose() throws IgniteCheckedException {
+                fut.cancel();
+            }
+        });
+    }
 
-                    Map.Entry<K,V> e = cur;
+    /** {@inheritDoc} */
+    @Override public QueryCursor<Entry<K,V>> query(QueryPredicate filter) {
+        A.notNull(filter, "filter");
 
-                    cur = null;
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
-                    return new CacheEntryImpl<>(e.getKey(), e.getValue());
-                }
+        try {
+            if (filter instanceof QuerySqlPredicate) {
+                QuerySqlPredicate p = (QuerySqlPredicate)filter;
 
-                @Override protected boolean onHasNext() throws 
IgniteCheckedException {
-                    return cur != null || (cur = fut.next()) != null;
-                }
+                return ctx.kernalContext().query().queryTwoStep(ctx.name(), 
p.getType(), p.getSql(), p.getArgs());
+            }
 
-                @Override protected void onClose() throws 
IgniteCheckedException {
-                    fut.cancel();
-                }
-            });
+            return query(filter, null);
         }
         finally {
             gate.leave(prev);
@@ -313,24 +330,50 @@ public class IgniteCacheProxy<K, V> extends 
IgniteAsyncSupportAdapter<IgniteCach
     @Override public QueryCursor<List<?>> queryFields(QuerySqlPredicate 
filter) {
         A.notNull(filter, "filter");
 
-        // TODO implement
-        return null;
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            return ctx.kernalContext().query().queryTwoStep(ctx.name(), 
filter.getSql(), filter.getArgs());
+        }
+        finally {
+            gate.leave(prev);
+        }
     }
 
     /** {@inheritDoc} */
     @Override public QueryCursor<Entry<K,V>> localQuery(QueryPredicate filter) 
{
         A.notNull(filter, "filter");
 
-        // TODO implement
-        return null;
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            if (filter instanceof QuerySqlPredicate) {
+                QuerySqlPredicate p = (QuerySqlPredicate)filter;
+
+                return new 
QueryCursorImpl<>(ctx.kernalContext().query().<K,V>queryLocal(
+                    ctx.name(), p.getType(), p.getSql(), p.getArgs()));
+            }
+
+            return query(filter, ctx.kernalContext().grid().forLocal());
+        }
+        finally {
+            gate.leave(prev);
+        }
     }
 
     /** {@inheritDoc} */
     @Override public QueryCursor<List<?>> localQueryFields(QuerySqlPredicate 
filter) {
         A.notNull(filter, "filter");
 
-        // TODO implement
-        return null;
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            return new 
QueryCursorImpl<>(ctx.kernalContext().query().queryLocalFields(
+                ctx.name(), filter.getSql(), filter.getArgs()));
+        }
+        finally {
+            gate.leave(prev);
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8dcf8954/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 5a5d09a..5a7e1d3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -201,4 +201,11 @@ public interface GridQueryIndexing {
      * @param type Type descriptor.
      */
     public void rebuildIndexes(@Nullable String spaceName, 
GridQueryTypeDescriptor type);
+
+    /**
+     * Returns backup filter.
+     *
+     * @return Backup filter.
+     */
+    public IndexingQueryFilter backupFilter();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8dcf8954/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 903c4e1..3a21586 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.cache.query.annotations.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
@@ -494,6 +495,82 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
 
     /**
      * @param space Space.
+     * @param type Type.
+     * @param sqlQry Query.
+     * @param params Parameters.
+     * @return Cursor.
+     */
+    public <K,V> Iterator<Cache.Entry<K,V>> queryLocal(String space, String 
type, String sqlQry, Object[] params) {
+        if (!busyLock.enterBusy())
+            throw new IllegalStateException("Failed to execute query (grid is 
stopping).");
+
+        try {
+            TypeDescriptor typeDesc = typesByName.get(new TypeName(space, 
type));
+
+            if (typeDesc == null || !typeDesc.registered())
+                return new GridEmptyCloseableIterator<>();
+
+            final GridCloseableIterator<IgniteBiTuple<K,V>> i = 
idx.query(space, sqlQry, F.asList(params), typeDesc,
+                idx.backupFilter());
+
+            return new ClIter<Cache.Entry<K,V>>() {
+                @Override public void close() throws Exception {
+                    i.close();
+                }
+
+                @Override public boolean hasNext() {
+                    return i.hasNext();
+                }
+
+                @Override public Cache.Entry<K,V> next() {
+                    IgniteBiTuple<K,V> t = i.next();
+
+                    return new CacheEntryImpl<>(t.getKey(), t.getValue());
+                }
+
+                @Override public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Closeable iterator.
+     */
+    private static interface ClIter<X> extends AutoCloseable, Iterator<X> {
+        // No-op.
+    }
+
+    /**
+     * @param space Space.
+     * @param sql SQL Query.
+     * @param args Arguments.
+     * @return Iterator.
+     */
+    public Iterator<List<?>> queryLocalFields(String space, String sql, 
Object[] args) {
+        if (!busyLock.enterBusy())
+            throw new IllegalStateException("Failed to execute query (grid is 
stopping).");
+
+        try {
+            return idx.queryFields(space, sql, F.asList(args), 
idx.backupFilter()).iterator();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * @param space Space.
      * @param key Key.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8dcf8954/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index be0d2d0..1169fcf 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -34,11 +34,6 @@ import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.offheap.unsafe.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.*;
 import org.apache.ignite.marshaller.optimized.*;
@@ -1440,6 +1435,24 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
         schemaNames.add(schema(spaceName));
     }
 
+    /** {@inheritDoc} */
+    @Override public IndexingQueryFilter backupFilter() {
+        return new IndexingQueryFilter() {
+            @Nullable @Override public <K, V> IgniteBiPredicate<K, V> 
forSpace(String spaceName) {
+                final GridCacheAdapter<Object, Object> cache = 
ctx.cache().internalCache(spaceName);
+
+                if (cache.context().isReplicated() || 
cache.configuration().getBackups() == 0)
+                    return null;
+
+                return new IgniteBiPredicate<K, V>() {
+                    @Override public boolean apply(K k, V v) {
+                        return 
cache.context().affinity().primary(ctx.discovery().localNode(), k, -1);
+                    }
+                };
+            }
+        };
+    }
+
     /**
      * Wrapper to store connection and flag is schema set or not.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8dcf8954/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 12a5b83..01483b2 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -12,18 +12,15 @@ package 
org.apache.ignite.internal.processors.query.h2.twostep;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.query.h2.*;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.spi.indexing.*;
 import org.h2.jdbc.*;
 import org.h2.result.*;
 import org.h2.value.*;
 import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
 
 import java.lang.reflect.*;
 import java.sql.*;
@@ -97,20 +94,7 @@ public class GridMapQueryExecutor {
      * @param req Query request.
      */
     private void executeLocalQuery(ClusterNode node, GridQueryRequest req) {
-        h2.setFilters(new IndexingQueryFilter() {
-            @Nullable @Override public <K, V> IgniteBiPredicate<K, V> 
forSpace(String spaceName) {
-                final GridCacheAdapter<Object, Object> cache = 
ctx.cache().internalCache(spaceName);
-
-                if (cache.context().isReplicated() || 
cache.configuration().getBackups() == 0)
-                    return null;
-
-                return new IgniteBiPredicate<K, V>() {
-                    @Override public boolean apply(K k, V v) {
-                        return 
cache.context().affinity().primary(ctx.discovery().localNode(), k, -1);
-                    }
-                };
-            }
-        });
+        h2.setFilters(h2.backupFilter());
 
         try {
             QueryResults qr = new QueryResults(req.requestId(), 
req.queries().size());

Reply via email to