IGNITE-7095: SQL: fixed per-thread H2 connection leak. This closes #3141.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/316feb86 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/316feb86 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/316feb86 Branch: refs/heads/ignite-zk-ce Commit: 316feb86d4d8d2f841601b24d07eef7c049f4113 Parents: 1e8eaff Author: tledkov-gridgain <[email protected]> Authored: Tue Dec 12 16:19:55 2017 +0300 Committer: devozerov <[email protected]> Committed: Tue Dec 12 16:19:55 2017 +0300 ---------------------------------------------------------------------- .../processors/query/h2/IgniteH2Indexing.java | 67 +++++++- .../cache/index/H2ConnectionLeaksSelfTest.java | 169 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 2 + 3 files changed, 231 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/316feb86/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 83eaa33..6fdcd27 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -221,6 +221,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** The period of clean up the {@link #stmtCache}. */ private final Long CLEANUP_STMT_CACHE_PERIOD = Long.getLong(IGNITE_H2_INDEXING_CACHE_CLEANUP_PERIOD, 10_000); + /** The period of clean up the {@link #conns}. */ + private final Long CLEANUP_CONNECTIONS_PERIOD = 2000L; + /** The timeout to remove entry from the {@link #stmtCache} if the thread doesn't perform any queries. */ private final Long STATEMENT_CACHE_THREAD_USAGE_TIMEOUT = Long.getLong(IGNITE_H2_INDEXING_CACHE_THREAD_USAGE_TIMEOUT, 600 * 1000); @@ -228,6 +231,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** */ private GridTimeoutProcessor.CancelableTask stmtCacheCleanupTask; + /** */ + private GridTimeoutProcessor.CancelableTask connCleanupTask; + /** Logger. */ @LoggerResource private IgniteLogger log; @@ -245,7 +251,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { private String dbUrl = "jdbc:h2:mem:"; /** */ - private final Collection<Connection> conns = Collections.synchronizedCollection(new ArrayList<Connection>()); + private final ConcurrentMap<Thread, Connection> conns = new ConcurrentHashMap8<>(); /** */ private GridMapQueryExecutor mapQryExec; @@ -304,7 +310,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { throw new IgniteSQLException("Failed to initialize DB connection: " + dbUrl, e); } - conns.add(c); + conns.put(Thread.currentThread(), c); return new H2ConnectionWrapper(c); } @@ -485,6 +491,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { private void createSchema(String schema) throws IgniteCheckedException { executeStatement("INFORMATION_SCHEMA", "CREATE SCHEMA IF NOT EXISTS " + H2Utils.withQuotes(schema)); + // This method is typically called from internal Ignite threads on bootstrap, no need to cache this connection. + conns.remove(Thread.currentThread()); + if (log.isDebugEnabled()) log.debug("Created H2 schema for index database: " + schema); } @@ -561,7 +570,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { connCache.set(null); if (conn != null) { - conns.remove(conn); + conns.remove(Thread.currentThread()); // Reset connection to receive new one at next call. U.close(conn, log); @@ -782,6 +791,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param tbl Table. * @param pk Primary key flag. * @param cols Columns. + * @param inlineSize Index inline size. * @return Index. */ public GridH2IndexBase createSortedIndex(String name, GridH2Table tbl, boolean pk, List<IndexColumn> cols, @@ -949,6 +959,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { * * @param conn Connection,. * @param stmt Statement. + * @param timeoutMillis Query timeout. * @param cancel Query cancel. * @return Result. * @throws IgniteCheckedException If failed. @@ -1021,6 +1032,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param sql Sql query. * @param params Parameters. * @param useStmtCache If {@code true} uses stmt cache. + * @param timeoutMillis Query timeout. * @param cancel Query cancel. * @return Result. * @throws IgniteCheckedException If failed. @@ -1038,6 +1050,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param conn Connection. * @param sql Sql query. * @param params Parameters. + * @param timeoutMillis Query timeout. * @param cancel Query cancel. * @return Result. * @throws IgniteCheckedException If failed. @@ -1172,7 +1185,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param alias Table alias. * @param params Query parameters. * @param type Query return type. - * @param filter Cache name and key filter. @return Queried rows. + * @param filter Cache name and key filter. + * @param cancel Cancel object. + * @return Queried rows. * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") @@ -1216,6 +1231,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { * @param qry Query. * @param keepCacheObj Flag to keep cache object. * @param enforceJoinOrder Enforce join order of tables. + * @param timeoutMillis Query timeout. + * @param cancel Cancel object. + * @param params Query parameters. * @param parts Partitions. * @param lazy Lazy query execution flag. * @return Iterable result. @@ -1680,6 +1698,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * @param cacheIds Caches identifiers. * @throws IllegalStateException if segmented indices used with non-segmented indices. */ private void checkCacheIndexSegmentation(List<Integer> cacheIds) { @@ -1939,6 +1958,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { * Get table descriptor. * * @param schemaName Schema name. + * @param cacheName Cache name. * @param type Type name. * @return Descriptor. */ @@ -2008,6 +2028,23 @@ public class IgniteH2Indexing implements GridQueryIndexing { } /** + * Called periodically by {@link GridTimeoutProcessor} to clean up the {@link #stmtCache}. + */ + private void cleanupConnections() { + for (Iterator<Map.Entry<Thread, Connection>> it = conns.entrySet().iterator(); it.hasNext(); ) { + Map.Entry<Thread, Connection> entry = it.next(); + + Thread t = entry.getKey(); + + if (t.getState() == Thread.State.TERMINATED) { + U.close(entry.getValue(), log); + + it.remove(); + } + } + } + + /** * Rebuild indexes from hash index. * * @param cacheName Cache name. @@ -2143,6 +2180,12 @@ public class IgniteH2Indexing implements GridQueryIndexing { U.warn(log, "Custom H2 serialization is already configured, will override."); JdbcUtils.serializer = h2Serializer(); + + connCleanupTask = ctx.timeout().schedule(new Runnable() { + @Override public void run() { + cleanupConnections(); + } + }, CLEANUP_CONNECTIONS_PERIOD, CLEANUP_CONNECTIONS_PERIOD); } /** @@ -2306,7 +2349,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { mapQryExec.cancelLazyWorkers(); - for (Connection c : conns) + for (Connection c : conns.values()) U.close(c, log); conns.clear(); @@ -2323,6 +2366,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { if (stmtCacheCleanupTask != null) stmtCacheCleanupTask.close(); + if (connCleanupTask != null) + connCleanupTask.close(); + GridH2QueryContext.clearLocalNodeStop(nodeId); if (log.isDebugEnabled()) @@ -2570,8 +2616,15 @@ public class IgniteH2Indexing implements GridQueryIndexing { @Override public void cancelAllQueries() { mapQryExec.cancelLazyWorkers(); - for (Connection conn : conns) - U.close(conn, log); + for (Connection c : conns.values()) + U.close(c, log); + } + + /** + * @return Per-thread connections. + */ + public Map<Thread, Connection> perThreadConnections() { + return conns; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/316feb86/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java new file mode 100644 index 0000000..417c1f3 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2ConnectionLeaksSelfTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import java.sql.Connection; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Test for leaks JdbcConnection on SqlFieldsQuery execute. + */ +public class H2ConnectionLeaksSelfTest extends GridCommonAbstractTest { + /** Cache name. */ + private static final String CACHE_NAME = "cache"; + + /** Nodes count. */ + private static final int NODE_CNT = 2; + + /** Keys count. */ + private static final int KEY_CNT = 100; + + /** Threads count. */ + private static final int THREAD_CNT = 100; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + Ignite node = startGrids(NODE_CNT); + + IgniteCache<Long, String> cache = node.cache(CACHE_NAME); + + for (int i = 0; i < KEY_CNT; i++) + cache.put((long)i, String.valueOf(i)); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + CacheConfiguration<Long, String> ccfg = new CacheConfiguration<Long, String>().setName(CACHE_NAME) + .setIndexedTypes(Long.class, String.class); + + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setCacheConfiguration(ccfg); + + if (getTestIgniteInstanceIndex(igniteInstanceName) != 0) + cfg.setClientMode(true); + + return cfg; + } + + /** + * @throws Exception On failed. + */ + public void testConnectionLeaks() throws Exception { + final IgniteCache cache = grid(1).cache(CACHE_NAME); + + final CountDownLatch latch = new CountDownLatch(THREAD_CNT); + + for (int i = 0; i < THREAD_CNT; i++) { + new Thread() { + @Override public void run() { + SqlFieldsQuery qry = new SqlFieldsQuery("select * from String").setLocal(false); + + cache.query(qry).getAll(); + + latch.countDown(); + } + }.start(); + } + + latch.await(); + + boolean res = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (int i = 0; i < NODE_CNT; i++) { + Map<Thread, Connection> conns = perThreadConnections(i); + + if (conns.isEmpty()) + return false; + } + + return true; + } + }, 5000); + + assert res; + } + + /** + * @throws Exception On failed. + */ + public void testConnectionLeaksOnSqlException() throws Exception { + final CountDownLatch latch = new CountDownLatch(THREAD_CNT); + final CountDownLatch latch2 = new CountDownLatch(1); + + for (int i = 0; i < THREAD_CNT; i++) { + new Thread() { + @Override public void run() { + try { + IgniteH2Indexing idx = (IgniteH2Indexing)grid(1).context().query().getIndexing(); + + idx.executeStatement(CACHE_NAME, "select *"); + } + catch (Exception e) { + // No-op. + } + + latch.countDown(); + + try { + latch2.await(); + } + catch (InterruptedException e) { + // No-op; + } + } + }.start(); + } + + try { + latch.await(); + + for (int i = 0; i < NODE_CNT; i++) { + Map<Thread, Connection> conns = perThreadConnections(i); + + assertTrue(conns.isEmpty()); + } + } + finally { + latch2.countDown(); + } + } + + /** + * @param nodeIdx Node index. + * @return Per-thread connections. + */ + private Map<Thread, Connection> perThreadConnections(int nodeIdx) { + return ((IgniteH2Indexing)grid(nodeIdx).context().query().getIndexing()).perThreadConnections(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/316feb86/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 7b3b271..4d8016b 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -119,6 +119,7 @@ import org.apache.ignite.internal.processors.cache.index.H2DynamicIndexingComple import org.apache.ignite.internal.processors.cache.index.H2DynamicIndexingComplexServerTransactionalPartitionedTest; import org.apache.ignite.internal.processors.cache.index.H2DynamicIndexingComplexServerTransactionalReplicatedTest; import org.apache.ignite.internal.processors.cache.index.H2DynamicTableSelfTest; +import org.apache.ignite.internal.processors.cache.index.H2ConnectionLeaksSelfTest; import org.apache.ignite.internal.processors.cache.index.LongIndexNameTest; import org.apache.ignite.internal.processors.cache.index.SchemaExchangeSelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQuerySelfTest; @@ -366,6 +367,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(LongIndexNameTest.class); suite.addTestSuite(GridCacheQuerySqlFieldInlineSizeSelfTest.class); suite.addTestSuite(IgniteSqlParameterizedQueryTest.class); + suite.addTestSuite(H2ConnectionLeaksSelfTest.class); suite.addTestSuite(IgniteCheckClusterStateBeforeExecuteQueryTest.class);
