IGNITE-4518 Fixed parallel load of cache. - Fixes #1426. Signed-off-by: Andrey Novikov <anovi...@gridgain.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/79401b2e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/79401b2e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/79401b2e Branch: refs/heads/ignite-comm-balance-master Commit: 79401b2ebf814eeffa36d41a82d1238d8eccc7e9 Parents: 6045a24 Author: Andrey Novikov <anovi...@gridgain.com> Authored: Mon Jan 16 10:33:16 2017 +0700 Committer: Andrey Novikov <anovi...@gridgain.com> Committed: Mon Jan 16 10:33:16 2017 +0700 ---------------------------------------------------------------------- .../store/jdbc/dialect/BasicJdbcDialect.java | 31 ++++++++----- .../store/jdbc/CacheJdbcPojoStoreTest.java | 48 +++++++++++++++++++- 2 files changed, 66 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/79401b2e/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java index 3ab112a..139f3fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/dialect/BasicJdbcDialect.java @@ -173,13 +173,15 @@ public class BasicJdbcDialect implements JdbcDialect { if (appendLowerBound) { sb.a("("); - for (int cnt = keyCols.size(); cnt > 0; cnt--) { - for (int j = 0; j < cnt; j++) - if (j == cnt - 1) - sb.a(cols[j]).a(" > ? "); + for (int keyCnt = keyCols.size(); keyCnt > 0; keyCnt--) { + for (int idx = 0; idx < keyCnt; idx++) { + if (idx == keyCnt - 1) + sb.a(cols[idx]).a(" > ? "); else - sb.a(cols[j]).a(" = ? AND "); - if (cnt != 1) + sb.a(cols[idx]).a(" = ? AND "); + } + + if (keyCnt != 1) sb.a("OR "); } @@ -192,13 +194,18 @@ public class BasicJdbcDialect implements JdbcDialect { if (appendUpperBound) { sb.a("("); - for (int cnt = keyCols.size(); cnt > 0; cnt--) { - for (int j = 0; j < cnt; j++) - if (j == cnt - 1) - sb.a(cols[j]).a(" <= ? "); + for (int keyCnt = keyCols.size(); keyCnt > 0; keyCnt--) { + for (int idx = 0, lastIdx = keyCnt - 1; idx < keyCnt; idx++) { + sb.a(cols[idx]); + + // For composite key when not all of the key columns are constrained should use < (strictly less). + if (idx == lastIdx) + sb.a(keyCnt == keyCols.size() ? " <= ? " : " < ? "); else - sb.a(cols[j]).a(" = ? AND "); - if (cnt != 1) + sb.a(" = ? AND "); + } + + if (keyCnt != 1) sb.a(" OR "); } http://git-wip-us.apache.org/repos/asf/ignite/blob/79401b2e/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java index d8f75d3..4a0b1da 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/jdbc/CacheJdbcPojoStoreTest.java @@ -216,7 +216,7 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + "Person_Complex (id integer not null, org_id integer not null, city_id integer not null, " + - "name varchar(50), salary integer, PRIMARY KEY(id))"); + "name varchar(50), salary integer, PRIMARY KEY(id, org_id, city_id))"); conn.commit(); @@ -352,6 +352,52 @@ public class CacheJdbcPojoStoreTest extends GridAbstractCacheStoreSelfTest<Cache /** * @throws Exception If failed. */ + public void testParallelLoad() throws Exception { + Connection conn = store.openConnection(false); + + PreparedStatement prnComplexStmt = conn.prepareStatement("INSERT INTO Person_Complex(id, org_id, city_id, name, salary) VALUES (?, ?, ?, ?, ?)"); + + for (int i = 0; i < 8; i++) { + + prnComplexStmt.setInt(1, (i >> 2) & 1); + prnComplexStmt.setInt(2, (i >> 1) & 1); + prnComplexStmt.setInt(3, i % 2); + + prnComplexStmt.setString(4, "name"); + prnComplexStmt.setInt(5, 1000 + i * 500); + + prnComplexStmt.addBatch(); + } + + prnComplexStmt.executeBatch(); + + U.closeQuiet(prnComplexStmt); + + conn.commit(); + + U.closeQuiet(conn); + + final Collection<PersonComplexKey> prnComplexKeys = new ConcurrentLinkedQueue<>(); + + IgniteBiInClosure<Object, Object> c = new CI2<Object, Object>() { + @Override public void apply(Object k, Object v) { + if (k instanceof PersonComplexKey && v instanceof Person) + prnComplexKeys.add((PersonComplexKey)k); + else + fail("Unexpected entry [key=" + k + ", value=" + v + "]"); + } + }; + + store.setParallelLoadCacheMinimumThreshold(2); + + store.loadCache(c); + + assertEquals(8, prnComplexKeys.size()); + } + + /** + * @throws Exception If failed. + */ public void testWriteRetry() throws Exception { CacheJdbcPojoStore<Object, Object> store = store();