This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new f3792036b7c IGNITE-15245 Fixed JDBC connection leak with cache.invoke() over write-behind enabled cache (#10302) f3792036b7c is described below commit f3792036b7ccc5e41953f55bb5bdc3d6030e84b4 Author: Pavel Pereslegin <xxt...@gmail.com> AuthorDate: Mon Oct 17 11:49:58 2022 +0300 IGNITE-15245 Fixed JDBC connection leak with cache.invoke() over write-behind enabled cache (#10302) --- .../cache/store/GridCacheWriteBehindStore.java | 4 +- ...CacheJdbcPojoWriteBehindConnectionLeakTest.java | 134 +++++++++++++++++++++ ...teCacheStoreSessionWriteBehindAbstractTest.java | 2 +- .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + 4 files changed, 140 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java index 0cb1d90d656..ae647957cd8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheWriteBehindStore.java @@ -577,7 +577,9 @@ public class GridCacheWriteBehindStore<K, V> implements CacheStore<K, V>, Lifecy /** {@inheritDoc} */ @Override public void sessionEnd(boolean commit) { - // No-op. + // To prevent connection leaks, we must call CacheStore#sessionEnd + // on stores that manage connections themselves. + store.sessionEnd(commit); } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindConnectionLeakTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindConnectionLeakTest.java new file mode 100644 index 00000000000..cb255c7a4bf --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoWriteBehindConnectionLeakTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.jdbc; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.store.jdbc.dialect.H2Dialect; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.h2.jdbcx.JdbcConnectionPool; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Ensures that there are no connection leaks when working with an external data source. + */ +public class CacheJdbcPojoWriteBehindConnectionLeakTest extends GridCommonAbstractTest { + /** Table name. */ + private static final String TABLE_NAME = "person"; + + /** Connection pool. */ + private static final JdbcConnectionPool pool = (JdbcConnectionPool)new H2DataSourceFactory().create(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName).setCacheConfiguration(cacheConfig()); + } + + /** + * @return Cache configuration. + */ + public CacheConfiguration<Integer, Person> cacheConfig() { + CacheConfiguration<Integer, Person> cacheConfig = new CacheConfiguration<Integer, Person>() + .setName(DEFAULT_CACHE_NAME) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setReadThrough(true) + .setWriteThrough(true) + .setWriteBehindEnabled(true) + .setWriteBehindFlushFrequency(1_000); + + CacheJdbcPojoStoreFactory<Integer, Person> cacheStoreFactory = new CacheJdbcPojoStoreFactory<>(); + + cacheStoreFactory.setDataSourceFactory(() -> pool) + .setDialect(new H2Dialect()) + .setTypes(new JdbcType() + .setCacheName(DEFAULT_CACHE_NAME) + .setDatabaseTable(TABLE_NAME) + .setKeyType(Integer.class) + .setKeyFields(new JdbcTypeField(Types.INTEGER, "id", Integer.class, "id")) + .setValueType(Person.class) + .setValueFields(new JdbcTypeField(Types.VARCHAR, "name", String.class, "name")) + ); + + cacheConfig.setCacheStoreFactory(cacheStoreFactory); + + return cacheConfig; + } + + /** */ + @Before + public void setUp() throws SQLException { + execStandaloneQuery("CREATE TABLE IF NOT EXISTS " + TABLE_NAME + "(ID INT UNSIGNED PRIMARY KEY, NAME VARCHAR(20))"); + } + + /** */ + @After + public void tearDown() throws SQLException { + execStandaloneQuery("DROP TABLE " + TABLE_NAME); + } + + /** */ + @Test + public void testInvoke() throws Exception { + try (Ignite ignite = startGrid(0)) { + IgniteCache<Integer, Person> cache = ignite.getOrCreateCache(DEFAULT_CACHE_NAME); + + try (Transaction tx = ignite.transactions().txStart()) { + cache.invoke(0, (entry, arg) -> true); + + tx.commit(); + } + } + + // We close ignite before assertion to be sure that write-behind flushing is finished. + assertEquals(0, pool.getActiveConnections()); + } + + /** + * @param sql SQL query. + * @throws SQLException If failed. + */ + private void execStandaloneQuery(String sql) throws SQLException { + try (Connection connection = pool.getConnection(); + PreparedStatement statement = connection.prepareStatement(sql)) { + statement.execute(); + } + } + + /** */ + public static class Person implements Serializable { + /** Serial version UID. */ + private static final long serialVersionUID = 0; + + /** */ + private Integer id; + + /** */ + private String name; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java index 52a1d54863d..0a86ed99bb9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheStoreSessionWriteBehindAbstractTest.java @@ -184,7 +184,7 @@ public abstract class IgniteCacheStoreSessionWriteBehindAbstractTest extends Ign /** {@inheritDoc} */ @Override public void sessionEnd(boolean commit) throws CacheWriterException { - fail(); + // No-op. } /** {@inheritDoc} */ diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index b1d8b7d3f8b..8a2359b10be 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@ -32,6 +32,7 @@ import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreBinaryMarshallerStor import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreBinaryMarshallerWithSqlEscapeSelfTest; import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreMultitreadedSelfTest; import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreTest; +import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoWriteBehindConnectionLeakTest; import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoWriteBehindStoreWithCoalescingTest; import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSelfTest; import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest; @@ -185,6 +186,7 @@ public class IgniteCacheTestSuite { GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoStoreBinaryMarshallerStoreKeepBinaryWithSqlEscapeSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoStoreMultitreadedSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoWriteBehindStoreWithCoalescingTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, CacheJdbcPojoWriteBehindConnectionLeakTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCacheBalancingStoreSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCacheAffinityApiSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCacheStoreValueBytesSelfTest.class, ignoredTests);