IGNITE-1308: Moved regular (not continuous!) queries to Ignite.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8529e108
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8529e108
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8529e108

Branch: refs/heads/ignite-884
Commit: 8529e10855e71c63c4fc5a83a9cdb2300109bb19
Parents: 975f47e
Author: vozerov-gridgain <voze...@gridgain.com>
Authored: Wed Aug 26 15:43:34 2015 +0300
Committer: vozerov-gridgain <voze...@gridgain.com>
Committed: Wed Aug 26 15:43:34 2015 +0300

----------------------------------------------------------------------
 .../query/PlatformAbstractQueryCursor.java      | 192 +++++++++++++++++++
 .../cache/query/PlatformFieldsQueryCursor.java  |  50 +++++
 .../cache/query/PlatformQueryCursor.java        |  46 +++++
 3 files changed, 288 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8529e108/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
----------------------------------------------------------------------
diff --git 
a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
 
b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
new file mode 100644
index 0000000..cdd29fd
--- /dev/null
+++ 
b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public abstract class PlatformAbstractQueryCursor<T> extends 
PlatformAbstractTarget implements AutoCloseable {
+    /** Get multiple entries. */
+    private static final int OP_GET_ALL = 1;
+
+    /** Get all entries. */
+    private static final int OP_GET_BATCH = 2;
+
+    /** Get single entry. */
+    private static final int OP_GET_SINGLE = 3;
+
+    /** Underlying cursor. */
+    private final QueryCursorEx<T> cursor;
+
+    /** Batch size size. */
+    private final int batchSize;
+
+    /** Underlying iterator. */
+    private Iterator<T> iter;
+
+    /**
+     * Constructor.
+     *
+     * @param interopCtx Interop context.
+     * @param cursor Underlying cursor.
+     * @param batchSize Batch size.
+     */
+    public PlatformAbstractQueryCursor(PlatformContext interopCtx, 
QueryCursorEx<T> cursor, int batchSize) {
+        super(interopCtx);
+
+        this.cursor = cursor;
+        this.batchSize = batchSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void processOutOp(int type, final PortableRawWriterEx 
writer) throws IgniteCheckedException {
+        switch (type) {
+            case OP_GET_BATCH: {
+                assert iter != null : "iterator() has not been called";
+
+                try {
+                    int cntPos = writer.reserveInt();
+
+                    int cnt;
+
+                    for (cnt = 0; cnt < batchSize; cnt++) {
+                        if (iter.hasNext())
+                            write(writer, iter.next());
+                        else
+                            break;
+                    }
+
+                    writer.writeInt(cntPos, cnt);
+                }
+                catch (Exception err) {
+                    throw PlatformUtils.unwrapQueryException(err);
+                }
+
+                break;
+            }
+
+            case OP_GET_SINGLE: {
+                assert iter != null : "iterator() has not been called";
+
+                try {
+                    if (iter.hasNext()) {
+                        write(writer, iter.next());
+
+                        return;
+                    }
+                }
+                catch (Exception err) {
+                    throw PlatformUtils.unwrapQueryException(err);
+                }
+
+                throw new IgniteCheckedException("No more data available.");
+            }
+
+            case OP_GET_ALL: {
+                try {
+                    int pos = writer.reserveInt();
+
+                    Consumer<T> consumer = new Consumer<>(this, writer);
+
+                    cursor.getAll(consumer);
+
+                    writer.writeInt(pos, consumer.cnt);
+                }
+                catch (Exception err) {
+                    throw PlatformUtils.unwrapQueryException(err);
+                }
+
+                break;
+            }
+
+            default:
+                throwUnsupported(type);
+        }
+    }
+
+    /**
+     * Get cursor iterator.
+     */
+    public void iterator() {
+        iter = cursor.iterator();
+    }
+
+    /**
+     * Check whether next iterator entry exists.
+     *
+     * @return {@code True} if exists.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public boolean iteratorHasNext() {
+        assert iter != null : "iterator() has not been called";
+
+        return iter.hasNext();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        cursor.close();
+    }
+
+    /**
+     * Write value to the stream. Extension point to perform conversions on 
the object before writing it.
+     *
+     * @param writer Writer.
+     * @param val Value.
+     */
+    protected abstract void write(PortableRawWriterEx writer, T val);
+
+    /**
+     * Query cursor consumer.
+     */
+    private static class Consumer<T> implements QueryCursorEx.Consumer<T> {
+        /** Current query cursor. */
+        private final PlatformAbstractQueryCursor<T> cursor;
+
+        /** Writer. */
+        private final PortableRawWriterEx writer;
+
+        /** Count. */
+        private int cnt;
+
+        /**
+         * Constructor.
+         *
+         * @param writer Writer.
+         */
+        public Consumer(PlatformAbstractQueryCursor<T> cursor, 
PortableRawWriterEx writer) {
+            this.cursor = cursor;
+            this.writer = writer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void consume(T val) throws IgniteCheckedException {
+            cursor.write(writer, val);
+
+            cnt++;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8529e108/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java
----------------------------------------------------------------------
diff --git 
a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java
 
b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java
new file mode 100644
index 0000000..f18a79a
--- /dev/null
+++ 
b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.platform.*;
+
+import java.util.*;
+
+/**
+ * Interop cursor for fields query.
+ */
+public class PlatformFieldsQueryCursor extends 
PlatformAbstractQueryCursor<List<?>> {
+    /**
+     * Constructor.
+     *
+     * @param interopCtx Interop context.
+     * @param cursor Backing cursor.
+     * @param batchSize Batch size.
+     */
+    public PlatformFieldsQueryCursor(PlatformContext interopCtx, 
QueryCursorEx<List<?>> cursor, int batchSize) {
+        super(interopCtx, cursor, batchSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void write(PortableRawWriterEx writer, List vals) {
+        assert vals != null;
+
+        writer.writeInt(vals.size());
+
+        for (Object val : vals)
+            writer.writeObjectDetached(val);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/8529e108/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java
----------------------------------------------------------------------
diff --git 
a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java
 
b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java
new file mode 100644
index 0000000..cc96d6f
--- /dev/null
+++ 
b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.query;
+
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.platform.*;
+
+import javax.cache.*;
+
+/**
+ * Interop cursor for regular queries.
+ */
+public class PlatformQueryCursor extends 
PlatformAbstractQueryCursor<Cache.Entry> {
+    /**
+     * Constructor.
+     *
+     * @param interopCtx Interop context.
+     * @param cursor Backing cursor.
+     * @param batchSize Batch size.
+     */
+    public PlatformQueryCursor(PlatformContext interopCtx, 
QueryCursorEx<Cache.Entry> cursor, int batchSize) {
+        super(interopCtx, cursor, batchSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void write(PortableRawWriterEx writer, Cache.Entry 
val) {
+        writer.writeObjectDetached(val.getKey());
+        writer.writeObjectDetached(val.getValue());
+    }
+}

Reply via email to