IGNITE-10580: SQL: Fixed incorrect re-use of cached connection for local queries. This closes #5592.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/86a815e1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/86a815e1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/86a815e1 Branch: refs/heads/ignite-601 Commit: 86a815e113dbfdb892f8f6c6ac4a1316fa4e7195 Parents: 2dabbd2 Author: tledkov-gridgain <tled...@gridgain.com> Authored: Tue Dec 25 17:25:49 2018 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Tue Dec 25 17:25:49 2018 +0300 ---------------------------------------------------------------------- .../processors/query/h2/H2FieldsIterator.java | 12 ++- .../processors/query/h2/IgniteH2Indexing.java | 25 ++++- .../h2/twostep/GridReduceQueryExecutor.java | 2 +- ...SqlLocalQueryConnectionAndStatementTest.java | 104 +++++++++++++++++++ .../IgniteBinaryCacheQueryTestSuite2.java | 3 + 5 files changed, 139 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/86a815e1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java index e9f293c..ef99a4b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.query.h2; +import com.sun.org.apache.xml.internal.utils.ObjectPool; import java.sql.ResultSet; import java.util.ArrayList; import java.util.Collections; @@ -34,17 +35,23 @@ public class H2FieldsIterator extends H2ResultSetIterator<List<?>> { /** */ private transient MvccQueryTracker mvccTracker; + /** Detached connection. */ + private final ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detachedConn; + /** * @param data Data. * @param mvccTracker Mvcc tracker. * @param forUpdate {@code SELECT FOR UPDATE} flag. + * @param detachedConn Detached connection. * @throws IgniteCheckedException If failed. */ - public H2FieldsIterator(ResultSet data, MvccQueryTracker mvccTracker, boolean forUpdate) + public H2FieldsIterator(ResultSet data, MvccQueryTracker mvccTracker, boolean forUpdate, + ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detachedConn) throws IgniteCheckedException { super(data, forUpdate); this.mvccTracker = mvccTracker; + this.detachedConn = detachedConn; } /** {@inheritDoc} */ @@ -62,6 +69,9 @@ public class H2FieldsIterator extends H2ResultSetIterator<List<?>> { super.onClose(); } finally { + if (detachedConn != null) + detachedConn.recycle(); + if (mvccTracker != null) mvccTracker.onDone(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/86a815e1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 9a2ff90..d1b435d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -635,6 +635,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { GridH2QueryContext.set(ctx); + ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detachedConn = connMgr.detachThreadConnection(); + try { ResultSet rs = executeSqlQueryWithTimer(stmt0, conn, qry0, params, timeout0, cancel); @@ -657,10 +659,20 @@ public class IgniteH2Indexing implements GridQueryIndexing { enlistFut.listen(new IgniteInClosure<IgniteInternalFuture<Long>>() { @Override public void apply(IgniteInternalFuture<Long> fut) { - if (fut.error() != null) - sfuFut0.onResult(IgniteH2Indexing.this.ctx.localNodeId(), 0L, false, fut.error()); - else - sfuFut0.onResult(IgniteH2Indexing.this.ctx.localNodeId(), fut.result(), false, null); + if (fut.error() != null) { + sfuFut0.onResult( + IgniteH2Indexing.this.ctx.localNodeId(), + 0L, + false, + fut.error()); + } + else { + sfuFut0.onResult( + IgniteH2Indexing.this.ctx.localNodeId(), + fut.result(), + false, + null); + } } }); @@ -679,9 +691,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { } } - return new H2FieldsIterator(rs, mvccTracker0, sfuFut0 != null); + return new H2FieldsIterator(rs, mvccTracker0, sfuFut0 != null, + detachedConn); } catch (IgniteCheckedException | RuntimeException | Error e) { + detachedConn.recycle(); + try { if (mvccTracker0 != null) mvccTracker0.onDone(); http://git-wip-us.apache.org/repos/asf/ignite/blob/86a815e1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 7009bd5..d90331c 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -760,7 +760,7 @@ public class GridReduceQueryExecutor { timeoutMillis, cancel); - resIter = new H2FieldsIterator(res, mvccTracker, false); + resIter = new H2FieldsIterator(res, mvccTracker, false, null); mvccTracker = null; // To prevent callback inside finally block; } http://git-wip-us.apache.org/repos/asf/ignite/blob/86a815e1/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlLocalQueryConnectionAndStatementTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlLocalQueryConnectionAndStatementTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlLocalQueryConnectionAndStatementTest.java new file mode 100644 index 0000000..bbff841 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlLocalQueryConnectionAndStatementTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.query; + +import java.util.Iterator; +import java.util.List; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Test for statement reuse. + */ +@RunWith(JUnit4.class) +public class SqlLocalQueryConnectionAndStatementTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override public void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + */ + @Test + public void testReplicated() { + sql("CREATE TABLE repl_tbl (id LONG PRIMARY KEY, val LONG) WITH \"template=replicated\"").getAll(); + + for (int i = 0; i < 10; i++) + sql("insert into repl_tbl(id,val) VALUES(" + i + "," + i + ")").getAll(); + + Iterator<List<?>> it0 = sql(new SqlFieldsQuery("SELECT * FROM repl_tbl where id > ?").setArgs(1)).iterator(); + + it0.next(); + + sql(new SqlFieldsQuery("SELECT * FROM repl_tbl where id > ?").setArgs(1)).getAll(); + + it0.next(); + } + + /** + */ + @Test + public void testLocalQuery() { + sql("CREATE TABLE tbl (id LONG PRIMARY KEY, val LONG)").getAll(); + + for (int i = 0; i < 10; i++) + sql("insert into tbl(id,val) VALUES(" + i + "," + i + ")").getAll(); + + Iterator<List<?>> it0 = sql( + new SqlFieldsQuery("SELECT * FROM tbl where id > ?") + .setArgs(1) + .setLocal(true)) + .iterator(); + + it0.next(); + + sql(new SqlFieldsQuery("SELECT * FROM tbl where id > ?").setArgs(1).setLocal(true)).getAll(); + + it0.next(); + } + + /** + * @param sql SQL query. + * @return Results. + */ + private FieldsQueryCursor<List<?>> sql(String sql) { + return sql(new SqlFieldsQuery(sql)); + } + + /** + * @param qry SQL query. + * @return Results. + */ + private FieldsQueryCursor<List<?>> sql(SqlFieldsQuery qry) { + GridQueryProcessor qryProc = grid(0).context().query(); + + return qryProc.querySqlFields(qry, true); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/86a815e1/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java index e4c918e..8273e9e 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.query.IgniteCacheGroupsCompareQuery import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlDistributedJoinSelfTest; import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlSegmentedIndexMultiNodeSelfTest; import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlSegmentedIndexSelfTest; +import org.apache.ignite.internal.processors.query.SqlLocalQueryConnectionAndStatementTest; import org.apache.ignite.internal.processors.query.h2.CacheQueryEntityWithDateTimeApiFieldsTest; import org.apache.ignite.internal.processors.query.h2.twostep.CacheQueryMemoryLeakTest; import org.apache.ignite.internal.processors.query.h2.twostep.CreateTableWithDateKeySelfTest; @@ -134,6 +135,8 @@ public class IgniteBinaryCacheQueryTestSuite2 { suite.addTest(new JUnit4TestAdapter(IgniteCacheQueriesLoadTest1.class)); + suite.addTest(new JUnit4TestAdapter(SqlLocalQueryConnectionAndStatementTest.class)); + return suite; } }