IGNITE-1308: Moved regular (not continuous!) queries to Ignite.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8529e108 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8529e108 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8529e108 Branch: refs/heads/ignite-884 Commit: 8529e10855e71c63c4fc5a83a9cdb2300109bb19 Parents: 975f47e Author: vozerov-gridgain <voze...@gridgain.com> Authored: Wed Aug 26 15:43:34 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Wed Aug 26 15:43:34 2015 +0300 ---------------------------------------------------------------------- .../query/PlatformAbstractQueryCursor.java | 192 +++++++++++++++++++ .../cache/query/PlatformFieldsQueryCursor.java | 50 +++++ .../cache/query/PlatformQueryCursor.java | 46 +++++ 3 files changed, 288 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8529e108/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java new file mode 100644 index 0000000..cdd29fd --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.query; + +import org.apache.ignite.*; +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.platform.*; +import org.apache.ignite.internal.processors.platform.utils.*; + +import java.util.*; + +/** + * + */ +public abstract class PlatformAbstractQueryCursor<T> extends PlatformAbstractTarget implements AutoCloseable { + /** Get multiple entries. */ + private static final int OP_GET_ALL = 1; + + /** Get all entries. */ + private static final int OP_GET_BATCH = 2; + + /** Get single entry. */ + private static final int OP_GET_SINGLE = 3; + + /** Underlying cursor. */ + private final QueryCursorEx<T> cursor; + + /** Batch size size. */ + private final int batchSize; + + /** Underlying iterator. */ + private Iterator<T> iter; + + /** + * Constructor. + * + * @param interopCtx Interop context. + * @param cursor Underlying cursor. + * @param batchSize Batch size. + */ + public PlatformAbstractQueryCursor(PlatformContext interopCtx, QueryCursorEx<T> cursor, int batchSize) { + super(interopCtx); + + this.cursor = cursor; + this.batchSize = batchSize; + } + + /** {@inheritDoc} */ + @Override protected void processOutOp(int type, final PortableRawWriterEx writer) throws IgniteCheckedException { + switch (type) { + case OP_GET_BATCH: { + assert iter != null : "iterator() has not been called"; + + try { + int cntPos = writer.reserveInt(); + + int cnt; + + for (cnt = 0; cnt < batchSize; cnt++) { + if (iter.hasNext()) + write(writer, iter.next()); + else + break; + } + + writer.writeInt(cntPos, cnt); + } + catch (Exception err) { + throw PlatformUtils.unwrapQueryException(err); + } + + break; + } + + case OP_GET_SINGLE: { + assert iter != null : "iterator() has not been called"; + + try { + if (iter.hasNext()) { + write(writer, iter.next()); + + return; + } + } + catch (Exception err) { + throw PlatformUtils.unwrapQueryException(err); + } + + throw new IgniteCheckedException("No more data available."); + } + + case OP_GET_ALL: { + try { + int pos = writer.reserveInt(); + + Consumer<T> consumer = new Consumer<>(this, writer); + + cursor.getAll(consumer); + + writer.writeInt(pos, consumer.cnt); + } + catch (Exception err) { + throw PlatformUtils.unwrapQueryException(err); + } + + break; + } + + default: + throwUnsupported(type); + } + } + + /** + * Get cursor iterator. + */ + public void iterator() { + iter = cursor.iterator(); + } + + /** + * Check whether next iterator entry exists. + * + * @return {@code True} if exists. + */ + @SuppressWarnings("UnusedDeclaration") + public boolean iteratorHasNext() { + assert iter != null : "iterator() has not been called"; + + return iter.hasNext(); + } + + /** {@inheritDoc} */ + @Override public void close() throws Exception { + cursor.close(); + } + + /** + * Write value to the stream. Extension point to perform conversions on the object before writing it. + * + * @param writer Writer. + * @param val Value. + */ + protected abstract void write(PortableRawWriterEx writer, T val); + + /** + * Query cursor consumer. + */ + private static class Consumer<T> implements QueryCursorEx.Consumer<T> { + /** Current query cursor. */ + private final PlatformAbstractQueryCursor<T> cursor; + + /** Writer. */ + private final PortableRawWriterEx writer; + + /** Count. */ + private int cnt; + + /** + * Constructor. + * + * @param writer Writer. + */ + public Consumer(PlatformAbstractQueryCursor<T> cursor, PortableRawWriterEx writer) { + this.cursor = cursor; + this.writer = writer; + } + + /** {@inheritDoc} */ + @Override public void consume(T val) throws IgniteCheckedException { + cursor.write(writer, val); + + cnt++; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8529e108/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java new file mode 100644 index 0000000..f18a79a --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.query; + +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.platform.*; + +import java.util.*; + +/** + * Interop cursor for fields query. + */ +public class PlatformFieldsQueryCursor extends PlatformAbstractQueryCursor<List<?>> { + /** + * Constructor. + * + * @param interopCtx Interop context. + * @param cursor Backing cursor. + * @param batchSize Batch size. + */ + public PlatformFieldsQueryCursor(PlatformContext interopCtx, QueryCursorEx<List<?>> cursor, int batchSize) { + super(interopCtx, cursor, batchSize); + } + + /** {@inheritDoc} */ + @Override protected void write(PortableRawWriterEx writer, List vals) { + assert vals != null; + + writer.writeInt(vals.size()); + + for (Object val : vals) + writer.writeObjectDetached(val); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8529e108/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java new file mode 100644 index 0000000..cc96d6f --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.query; + +import org.apache.ignite.internal.portable.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.platform.*; + +import javax.cache.*; + +/** + * Interop cursor for regular queries. + */ +public class PlatformQueryCursor extends PlatformAbstractQueryCursor<Cache.Entry> { + /** + * Constructor. + * + * @param interopCtx Interop context. + * @param cursor Backing cursor. + * @param batchSize Batch size. + */ + public PlatformQueryCursor(PlatformContext interopCtx, QueryCursorEx<Cache.Entry> cursor, int batchSize) { + super(interopCtx, cursor, batchSize); + } + + /** {@inheritDoc} */ + @Override protected void write(PortableRawWriterEx writer, Cache.Entry val) { + writer.writeObjectDetached(val.getKey()); + writer.writeObjectDetached(val.getValue()); + } +}