IGNITE-10580: SQL: Fixed incorrect re-use of cached connection for local 
queries. This closes #5592.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/86a815e1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/86a815e1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/86a815e1

Branch: refs/heads/ignite-601
Commit: 86a815e113dbfdb892f8f6c6ac4a1316fa4e7195
Parents: 2dabbd2
Author: tledkov-gridgain <tled...@gridgain.com>
Authored: Tue Dec 25 17:25:49 2018 +0300
Committer: devozerov <voze...@gridgain.com>
Committed: Tue Dec 25 17:25:49 2018 +0300

----------------------------------------------------------------------
 .../processors/query/h2/H2FieldsIterator.java   |  12 ++-
 .../processors/query/h2/IgniteH2Indexing.java   |  25 ++++-
 .../h2/twostep/GridReduceQueryExecutor.java     |   2 +-
 ...SqlLocalQueryConnectionAndStatementTest.java | 104 +++++++++++++++++++
 .../IgniteBinaryCacheQueryTestSuite2.java       |   3 +
 5 files changed, 139 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/86a815e1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java
index e9f293c..ef99a4b 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2FieldsIterator.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
+import com.sun.org.apache.xml.internal.utils.ObjectPool;
 import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -34,17 +35,23 @@ public class H2FieldsIterator extends 
H2ResultSetIterator<List<?>> {
     /** */
     private transient MvccQueryTracker mvccTracker;
 
+    /** Detached connection. */
+    private final ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> 
detachedConn;
+
     /**
      * @param data Data.
      * @param mvccTracker Mvcc tracker.
      * @param forUpdate {@code SELECT FOR UPDATE} flag.
+     * @param detachedConn Detached connection.
      * @throws IgniteCheckedException If failed.
      */
-    public H2FieldsIterator(ResultSet data, MvccQueryTracker mvccTracker, 
boolean forUpdate)
+    public H2FieldsIterator(ResultSet data, MvccQueryTracker mvccTracker, 
boolean forUpdate,
+        ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> detachedConn)
         throws IgniteCheckedException {
         super(data, forUpdate);
 
         this.mvccTracker = mvccTracker;
+        this.detachedConn = detachedConn;
     }
 
     /** {@inheritDoc} */
@@ -62,6 +69,9 @@ public class H2FieldsIterator extends 
H2ResultSetIterator<List<?>> {
             super.onClose();
         }
         finally {
+            if (detachedConn != null)
+                detachedConn.recycle();
+
             if (mvccTracker != null)
                 mvccTracker.onDone();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86a815e1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 9a2ff90..d1b435d 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -635,6 +635,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                     GridH2QueryContext.set(ctx);
 
+                    ThreadLocalObjectPool.Reusable<H2ConnectionWrapper> 
detachedConn = connMgr.detachThreadConnection();
+
                     try {
                         ResultSet rs = executeSqlQueryWithTimer(stmt0, conn, 
qry0, params, timeout0, cancel);
 
@@ -657,10 +659,20 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
                             enlistFut.listen(new 
IgniteInClosure<IgniteInternalFuture<Long>>() {
                                 @Override public void 
apply(IgniteInternalFuture<Long> fut) {
-                                    if (fut.error() != null)
-                                        
sfuFut0.onResult(IgniteH2Indexing.this.ctx.localNodeId(), 0L, false, 
fut.error());
-                                    else
-                                        
sfuFut0.onResult(IgniteH2Indexing.this.ctx.localNodeId(), fut.result(), false, 
null);
+                                    if (fut.error() != null) {
+                                        sfuFut0.onResult(
+                                            
IgniteH2Indexing.this.ctx.localNodeId(),
+                                            0L,
+                                            false,
+                                            fut.error());
+                                    }
+                                    else {
+                                        sfuFut0.onResult(
+                                            
IgniteH2Indexing.this.ctx.localNodeId(),
+                                            fut.result(),
+                                            false,
+                                            null);
+                                    }
                                 }
                             });
 
@@ -679,9 +691,12 @@ public class IgniteH2Indexing implements GridQueryIndexing 
{
                             }
                         }
 
-                        return new H2FieldsIterator(rs, mvccTracker0, sfuFut0 
!= null);
+                        return new H2FieldsIterator(rs, mvccTracker0, sfuFut0 
!= null,
+                            detachedConn);
                     }
                     catch (IgniteCheckedException | RuntimeException | Error 
e) {
+                        detachedConn.recycle();
+
                         try {
                             if (mvccTracker0 != null)
                                 mvccTracker0.onDone();

http://git-wip-us.apache.org/repos/asf/ignite/blob/86a815e1/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 7009bd5..d90331c 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -760,7 +760,7 @@ public class GridReduceQueryExecutor {
                                 timeoutMillis,
                                 cancel);
 
-                            resIter = new H2FieldsIterator(res, mvccTracker, 
false);
+                            resIter = new H2FieldsIterator(res, mvccTracker, 
false, null);
 
                             mvccTracker = null; // To prevent callback inside 
finally block;
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86a815e1/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlLocalQueryConnectionAndStatementTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlLocalQueryConnectionAndStatementTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlLocalQueryConnectionAndStatementTest.java
new file mode 100644
index 0000000..bbff841
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlLocalQueryConnectionAndStatementTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.query;
+
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test for statement reuse.
+ */
+@RunWith(JUnit4.class)
+public class SqlLocalQueryConnectionAndStatementTest extends 
GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override public void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     */
+    @Test
+    public void testReplicated() {
+        sql("CREATE TABLE repl_tbl (id LONG PRIMARY KEY, val LONG) WITH 
\"template=replicated\"").getAll();
+
+        for (int i = 0; i < 10; i++)
+            sql("insert into repl_tbl(id,val) VALUES(" + i + "," + i + 
")").getAll();
+
+        Iterator<List<?>> it0 = sql(new SqlFieldsQuery("SELECT * FROM repl_tbl 
where id > ?").setArgs(1)).iterator();
+
+        it0.next();
+
+        sql(new SqlFieldsQuery("SELECT * FROM repl_tbl where id > 
?").setArgs(1)).getAll();
+
+        it0.next();
+    }
+
+    /**
+     */
+    @Test
+    public void testLocalQuery() {
+        sql("CREATE TABLE tbl (id LONG PRIMARY KEY, val LONG)").getAll();
+
+        for (int i = 0; i < 10; i++)
+            sql("insert into tbl(id,val) VALUES(" + i + "," + i + 
")").getAll();
+
+        Iterator<List<?>> it0 = sql(
+            new SqlFieldsQuery("SELECT * FROM tbl where id > ?")
+                .setArgs(1)
+                .setLocal(true))
+            .iterator();
+
+        it0.next();
+
+        sql(new SqlFieldsQuery("SELECT * FROM tbl where id > 
?").setArgs(1).setLocal(true)).getAll();
+
+        it0.next();
+    }
+
+    /**
+     * @param sql SQL query.
+     * @return Results.
+     */
+    private FieldsQueryCursor<List<?>> sql(String sql) {
+        return sql(new SqlFieldsQuery(sql));
+    }
+
+    /**
+     * @param qry SQL query.
+     * @return Results.
+     */
+    private FieldsQueryCursor<List<?>> sql(SqlFieldsQuery qry) {
+        GridQueryProcessor qryProc = grid(0).context().query();
+
+        return qryProc.querySqlFields(qry, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/86a815e1/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
index e4c918e..8273e9e 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite2.java
@@ -52,6 +52,7 @@ import 
org.apache.ignite.internal.processors.query.IgniteCacheGroupsCompareQuery
 import 
org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlDistributedJoinSelfTest;
 import 
org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlSegmentedIndexMultiNodeSelfTest;
 import 
org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlSegmentedIndexSelfTest;
+import 
org.apache.ignite.internal.processors.query.SqlLocalQueryConnectionAndStatementTest;
 import 
org.apache.ignite.internal.processors.query.h2.CacheQueryEntityWithDateTimeApiFieldsTest;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.CacheQueryMemoryLeakTest;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.CreateTableWithDateKeySelfTest;
@@ -134,6 +135,8 @@ public class IgniteBinaryCacheQueryTestSuite2 {
 
         suite.addTest(new 
JUnit4TestAdapter(IgniteCacheQueriesLoadTest1.class));
 
+        suite.addTest(new 
JUnit4TestAdapter(SqlLocalQueryConnectionAndStatementTest.class));
+
         return suite;
     }
 }

Reply via email to