IGNITE-6019: SQL: do not pull the whole result set immediately to the client when there is no merge table. This closes #2430.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/59facfca Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/59facfca Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/59facfca Branch: refs/heads/ignite-5578 Commit: 59facfca3b60f22169f773f0e8b6f2e0b9a5c8e9 Parents: f3d3d1b Author: Alexander Paschenko <[email protected]> Authored: Wed Aug 16 09:41:54 2017 +0300 Committer: devozerov <[email protected]> Committed: Wed Aug 16 09:41:54 2017 +0300 ---------------------------------------------------------------------- .../h2/twostep/GridMergeIndexIterator.java | 165 +++++++++++++++++++ .../h2/twostep/GridReduceQueryExecutor.java | 61 +++---- 2 files changed, 188 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/59facfca/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java new file mode 100644 index 0000000..1c0efb3 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.h2.index.Cursor; +import org.h2.result.Row; + +/** + * Iterator that transparently and sequentially traverses a bunch of {@link GridMergeIndex} objects. + */ +class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable { + /** Reduce query executor. */ + private final GridReduceQueryExecutor rdcExec; + + /** Participating nodes. */ + private final Collection<ClusterNode> nodes; + + /** Query run. */ + private final ReduceQueryRun run; + + /** Query request ID. */ + private final long qryReqId; + + /** Distributed joins. */ + private final boolean distributedJoins; + + /** Iterator over indexes. */ + private final Iterator<GridMergeIndex> idxIter; + + /** Current cursor. */ + private Cursor cursor; + + /** Next row to return. */ + private List<Object> next; + + /** Whether remote resources were released. */ + private boolean released; + + /** + * Constructor. + * + * @param rdcExec Reduce query executor. + * @param nodes Participating nodes. + * @param run Query run. + * @param qryReqId Query request ID. + * @param distributedJoins Distributed joins. + * @throws IgniteCheckedException if failed. + */ + GridMergeIndexIterator(GridReduceQueryExecutor rdcExec, Collection<ClusterNode> nodes, ReduceQueryRun run, + long qryReqId, boolean distributedJoins) + throws IgniteCheckedException { + this.rdcExec = rdcExec; + this.nodes = nodes; + this.run = run; + this.qryReqId = qryReqId; + this.distributedJoins = distributedJoins; + + this.idxIter = run.indexes().iterator(); + + advance(); + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return next != null; + } + + /** {@inheritDoc} */ + @Override public List<?> next() { + List<Object> res = next; + + if (res == null) + throw new NoSuchElementException(); + + advance(); + + return res; + } + + /** {@inheritDoc} */ + @Override public void remove() { + throw new UnsupportedOperationException("Remove is not supported"); + } + + /** {@inheritDoc} */ + @Override public void close() throws Exception { + releaseIfNeeded(); + } + + /** + * Advance iterator. + */ + private void advance() { + next = null; + + try { + boolean hasNext = false; + + while (cursor == null || !(hasNext = cursor.next())) { + if (idxIter.hasNext()) + cursor = idxIter.next().findInStream(null, null); + else { + releaseIfNeeded(); + + break; + } + } + + if (hasNext) { + Row row = cursor.get(); + + int cols = row.getColumnCount(); + + List<Object> res = new ArrayList<>(cols); + + for (int c = 0; c < cols; c++) + res.add(row.getValue(c).getObject()); + + next = res; + } + } + catch (Exception e) { + releaseIfNeeded(); + + throw e; + } + } + + /** + * Close routine. + */ + private void releaseIfNeeded() { + if (!released) { + try { + rdcExec.releaseRemoteResources(nodes, run, qryReqId, distributedJoins); + } + finally { + released = true; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/59facfca/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 85a7e0b..0e9d1a2 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -87,10 +87,8 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.plugin.extensions.communication.Message; import org.h2.command.ddl.CreateTableData; import org.h2.engine.Session; -import org.h2.index.Cursor; import org.h2.index.Index; import org.h2.jdbc.JdbcConnection; -import org.h2.result.Row; import org.h2.table.Column; import org.h2.util.IntArray; import org.h2.value.Value; @@ -169,6 +167,7 @@ public class GridReduceQueryExecutor { log = ctx.log(GridReduceQueryExecutor.class); ctx.io().addMessageListener(GridTopic.TOPIC_QUERY, new GridMessageListener() { + @SuppressWarnings("deprecation") @Override public void onMessage(UUID nodeId, Object msg, byte plc) { if (!busyLock.enterBusy()) return; @@ -511,7 +510,7 @@ public class GridReduceQueryExecutor { */ public Iterator<List<?>> query( String schemaName, - GridCacheTwoStepQuery qry, + final GridCacheTwoStepQuery qry, boolean keepBinary, boolean enforceJoinOrder, int timeoutMillis, @@ -622,6 +621,8 @@ public class GridReduceQueryExecutor { int replicatedQrysCnt = 0; + final Collection<ClusterNode> finalNodes = nodes; + for (GridCacheSqlQuery mapQry : qry.mapQueries()) { GridMergeIndex idx; @@ -665,6 +666,8 @@ public class GridReduceQueryExecutor { runs.put(qryReqId, r); + boolean release = true; + try { cancel.checkCancelled(); @@ -686,8 +689,6 @@ public class GridReduceQueryExecutor { final boolean distributedJoins = qry.distributedJoins(); - final Collection<ClusterNode> finalNodes = nodes; - cancel.set(new Runnable() { @Override public void run() { send(finalNodes, new GridQueryCancelRequest(qryReqId), null, false); @@ -757,27 +758,9 @@ public class GridReduceQueryExecutor { if (!retry) { if (skipMergeTbl) { - List<List<?>> res = new ArrayList<>(); - - // Simple UNION ALL can have multiple indexes. - for (GridMergeIndex idx : r.indexes()) { - Cursor cur = idx.findInStream(null, null); - - while (cur.next()) { - Row row = cur.get(); - - int cols = row.getColumnCount(); - - List<Object> resRow = new ArrayList<>(cols); - - for (int c = 0; c < cols; c++) - resRow.add(row.getValue(c).getObject()); + resIter = new GridMergeIndexIterator(this, finalNodes, r, qryReqId, qry.distributedJoins()); - res.add(resRow); - } - } - - resIter = res.iterator(); + release = false; } else { cancel.checkCancelled(); @@ -820,6 +803,8 @@ public class GridReduceQueryExecutor { return new GridQueryCacheObjectsIterator(resIter, h2.objectContext(), keepBinary); } catch (IgniteCheckedException | RuntimeException e) { + release = true; + U.closeQuiet(r.connection()); if (e instanceof CacheException) { @@ -842,15 +827,13 @@ public class GridReduceQueryExecutor { throw new CacheException("Failed to run reduce query locally.", cause); } finally { - // Make sure any activity related to current attempt is cancelled. - cancelRemoteQueriesIfNeeded(nodes, r, qryReqId, qry.distributedJoins()); - - if (!runs.remove(qryReqId, r)) - U.warn(log, "Query run was already removed: " + qryReqId); + if (release) { + releaseRemoteResources(finalNodes, r, qryReqId, qry.distributedJoins()); - if (!skipMergeTbl) { - for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) - fakeTable(null, i).innerTable(null); // Drop all merge tables. + if (!skipMergeTbl) { + for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) + fakeTable(null, i).innerTable(null); // Drop all merge tables. + } } } } @@ -885,16 +868,15 @@ public class GridReduceQueryExecutor { } /** + * Release remote resources if needed. + * * @param nodes Query nodes. * @param r Query run. * @param qryReqId Query id. * @param distributedJoins Distributed join flag. */ - private void cancelRemoteQueriesIfNeeded(Collection<ClusterNode> nodes, - ReduceQueryRun r, - long qryReqId, - boolean distributedJoins) - { + public void releaseRemoteResources(Collection<ClusterNode> nodes, ReduceQueryRun r, long qryReqId, + boolean distributedJoins) { // For distributedJoins need always send cancel request to cleanup resources. if (distributedJoins) send(nodes, new GridQueryCancelRequest(qryReqId), null, false); @@ -907,6 +889,9 @@ public class GridReduceQueryExecutor { } } } + + if (!runs.remove(qryReqId, r)) + U.warn(log, "Query run was already removed: " + qryReqId); } /**
