IGNITE-5738: JDBC: add batch support. This closes #2393.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7781823d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7781823d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7781823d Branch: refs/heads/ignite-5578 Commit: 7781823d4552bb30efa48758b7473d07c9e8aee3 Parents: 9cfb050 Author: Sergey Kalashnikov <[email protected]> Authored: Thu Aug 17 14:58:38 2017 +0300 Committer: devozerov <[email protected]> Committed: Thu Aug 17 14:58:38 2017 +0300 ---------------------------------------------------------------------- .../jdbc2/JdbcDeleteStatementSelfTest.java | 22 ++ .../jdbc2/JdbcInsertStatementSelfTest.java | 159 ++++++++++++++ .../jdbc2/JdbcMergeStatementSelfTest.java | 41 ++++ .../jdbc2/JdbcStatementBatchingSelfTest.java | 133 ++++++++++++ .../jdbc2/JdbcUpdateStatementSelfTest.java | 24 +++ .../jdbc/suite/IgniteJdbcDriverTestSuite.java | 3 + .../internal/jdbc2/JdbcBatchUpdateTask.java | 215 +++++++++++++++++++ .../internal/jdbc2/JdbcDatabaseMetadata.java | 2 +- .../internal/jdbc2/JdbcPreparedStatement.java | 25 ++- .../ignite/internal/jdbc2/JdbcStatement.java | 59 ++++- 10 files changed, 675 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDeleteStatementSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDeleteStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDeleteStatementSelfTest.java index d55c979..3eec5a0 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDeleteStatementSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcDeleteStatementSelfTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.jdbc2; +import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Arrays; import java.util.HashSet; @@ -46,4 +47,25 @@ public class JdbcDeleteStatementSelfTest extends JdbcAbstractUpdateStatementSelf assertFalse(jcache(0).containsKey("p2")); assertTrue(jcache(0).containsKeys(new HashSet<Object>(Arrays.asList("p1", "p3")))); } + + /** + * + */ + public void testBatch() throws SQLException { + PreparedStatement ps = conn.prepareStatement("delete from Person where firstName = ?"); + + ps.setString(1, "John"); + + ps.addBatch(); + + ps.setString(1, "Harry"); + + ps.addBatch(); + + int[] res = ps.executeBatch(); + + assertFalse(jcache(0).containsKey("p1")); + assertTrue(jcache(0).containsKeys(new HashSet<Object>(Arrays.asList("p2", "p3")))); + assertTrue(Arrays.equals(new int[] {1, 0}, res)); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java index 0e7539f..407d6e2 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcInsertStatementSelfTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.jdbc2; +import java.sql.BatchUpdateException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -26,6 +27,7 @@ import java.util.HashSet; import java.util.concurrent.Callable; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.testframework.GridTestUtils; /** @@ -174,4 +176,161 @@ public class JdbcInsertStatementSelfTest extends JdbcAbstractDmlStatementSelfTes assertEquals(3, jcache(0).withKeepBinary().getAll(new HashSet<>(Arrays.asList("p1", "p2", "p3"))).size()); } + + /** + * @throws SQLException if failed. + */ + public void testBatch() throws SQLException { + formBatch(1, 2); + formBatch(3, 4); + + int[] res = prepStmt.executeBatch(); + + assertTrue(Arrays.equals(new int[] {2, 2}, res)); + } + + /** + * @throws SQLException if failed. + */ + public void testSingleItemBatch() throws SQLException { + formBatch(1, 2); + + int[] res = prepStmt.executeBatch(); + + assertTrue(Arrays.equals(new int[] {2}, res)); + } + + /** + * @throws SQLException if failed. + */ + public void testSingleItemBatchError() throws SQLException { + formBatch(1, 2); + + prepStmt.executeBatch(); + + formBatch(1, 2); // Duplicate key + + BatchUpdateException reason = (BatchUpdateException) + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return prepStmt.executeBatch(); + } + }, + BatchUpdateException.class, + "Failed to INSERT some keys because they are already in cache"); + + // Check update counts in the exception. + assertTrue(F.isEmpty(reason.getUpdateCounts())); + } + + /** + * @throws SQLException if failed. + */ + public void testErrorAmidstBatch() throws SQLException { + formBatch(1, 2); + formBatch(3, 1); // Duplicate key + + BatchUpdateException reason = (BatchUpdateException) + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return prepStmt.executeBatch(); + } + }, + BatchUpdateException.class, + "Failed to INSERT some keys because they are already in cache"); + + // Check update counts in the exception. + int[] counts = reason.getUpdateCounts(); + + assertNotNull(counts); + + assertEquals(1, counts.length); + assertEquals(2, counts[0]); + } + + /** + * @throws Exception If failed. + */ + public void testClearBatch() throws Exception { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws SQLException { + return prepStmt.executeBatch(); + } + }, SQLException.class, "Batch is empty"); + + formBatch(1, 2); + + prepStmt.clearBatch(); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws SQLException { + return prepStmt.executeBatch(); + } + }, SQLException.class, "Batch is empty"); + } + + /** + * Form batch on prepared statement. + * + * @param id1 id for first row. + * @param id2 id for second row. + * @throws SQLException if failed. + */ + private void formBatch(int id1, int id2) throws SQLException { + int[] ids = new int[] { id1, id2 }; + + int arg = 0; + for (int id: ids) { + String key = "p" + id; + + switch (id) { + case 1: + prepStmt.setString(arg + 1, key); + prepStmt.setInt(arg + 2, 1); + prepStmt.setString(arg + 3, "John"); + prepStmt.setString(arg + 4, "White"); + prepStmt.setInt(arg + 5, 25); + prepStmt.setBytes(arg + 6, getBytes("White")); + + break; + + case 2: + prepStmt.setString(arg + 1, key); + prepStmt.setInt(arg + 2, 2); + prepStmt.setString(arg + 3, "Joe"); + prepStmt.setString(arg + 4, "Black"); + prepStmt.setInt(arg + 5, 35); + prepStmt.setBytes(arg + 6, getBytes("Black")); + + break; + + case 3: + prepStmt.setString(arg + 1, key); + prepStmt.setInt(arg + 2, 3); + prepStmt.setString(arg + 3, "Mike"); + prepStmt.setString(arg + 4, "Green"); + prepStmt.setInt(arg + 5, 40); + prepStmt.setBytes(arg + 6, getBytes("Green")); + + break; + + case 4: + prepStmt.setString(arg + 1, key); + prepStmt.setInt(arg + 2, 4); + prepStmt.setString(arg + 3, "Leah"); + prepStmt.setString(arg + 4, "Grey"); + prepStmt.setInt(arg + 5, 22); + prepStmt.setBytes(arg + 6, getBytes("Grey")); + + break; + + default: + assert false; + } + + arg += 6; + } + + prepStmt.addBatch(); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java index 1432a78..489bacd 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcMergeStatementSelfTest.java @@ -21,6 +21,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Arrays; import org.apache.ignite.cache.CachePeekMode; /** @@ -143,4 +144,44 @@ public class JdbcMergeStatementSelfTest extends JdbcAbstractDmlStatementSelfTest assertEquals(false, res); } + + /** + * @throws SQLException if failed. + */ + public void testBatch() throws SQLException { + prepStmt.setString(1, "p1"); + prepStmt.setInt(2, 1); + prepStmt.setString(3, "John"); + prepStmt.setString(4, "White"); + prepStmt.setInt(5, 25); + prepStmt.setBytes(6, getBytes("White")); + + prepStmt.setString(7, "p2"); + prepStmt.setInt(8, 2); + prepStmt.setString(9, "Joe"); + prepStmt.setString(10, "Black"); + prepStmt.setInt(11, 35); + prepStmt.setBytes(12, getBytes("Black")); + prepStmt.addBatch(); + + prepStmt.setString(1, "p3"); + prepStmt.setInt(2, 3); + prepStmt.setString(3, "Mike"); + prepStmt.setString(4, "Green"); + prepStmt.setInt(5, 40); + prepStmt.setBytes(6, getBytes("Green")); + + prepStmt.setString(7, "p4"); + prepStmt.setInt(8, 4); + prepStmt.setString(9, "Leah"); + prepStmt.setString(10, "Grey"); + prepStmt.setInt(11, 22); + prepStmt.setBytes(12, getBytes("Grey")); + + prepStmt.addBatch(); + + int[] res = prepStmt.executeBatch(); + + assertTrue(Arrays.equals(new int[] {2, 2}, res)); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStatementBatchingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStatementBatchingSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStatementBatchingSelfTest.java new file mode 100644 index 0000000..c9169b9 --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStatementBatchingSelfTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.jdbc2; + +import java.sql.BatchUpdateException; +import java.sql.DatabaseMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.Callable; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * Statement batch test. + */ +public class JdbcStatementBatchingSelfTest extends JdbcAbstractDmlStatementSelfTest { + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + jcache(0).clear(); + } + + /** + * @throws SQLException If failed. + */ + public void testDatabaseMetadataBatchSupportFlag() throws SQLException { + DatabaseMetaData meta = conn.getMetaData(); + + assertNotNull(meta); + + assertTrue(meta.supportsBatchUpdates()); + } + + /** + * @throws SQLException If failed. + */ + public void testBatch() throws SQLException { + try (Statement stmt = conn.createStatement()) { + stmt.addBatch("INSERT INTO Person(_key, id, firstName, lastName, age, data) " + + "VALUES ('p1', 0, 'J', 'W', 250, RAWTOHEX('W'))"); + + stmt.addBatch("MERGE INTO Person(_key, id, firstName, lastName, age, data) VALUES " + + "('p1', 1, 'John', 'White', 25, RAWTOHEX('White')), " + + "('p2', 2, 'Joe', 'Black', 35, RAWTOHEX('Black')), " + + "('p3', 0, 'M', 'G', 4, RAWTOHEX('G'))"); + + stmt.addBatch("UPDATE Person SET id = 3, firstName = 'Mike', lastName = 'Green', " + + "age = 40, data = RAWTOHEX('Green') WHERE _key = 'p3'"); + + stmt.addBatch("DELETE FROM Person WHERE _key = 'p1'"); + + int[] res = stmt.executeBatch(); + + assertEquals(4, res.length); + assertEquals(1, res[0]); + assertEquals(3, res[1]); + assertEquals(1, res[2]); + assertEquals(1, res[3]); + } + } + + /** + * @throws SQLException If failed. + */ + public void testErrorAmidstBatch() throws SQLException { + BatchUpdateException reason = (BatchUpdateException) + GridTestUtils.assertThrows(log, + new Callable<Object>() { + @Override public Object call() throws Exception { + try (Statement stmt = conn.createStatement()) { + stmt.addBatch("INSERT INTO Person(_key, id, firstName, lastName, age, data) " + + "VALUES ('p1', 0, 'J', 'W', 250, RAWTOHEX('W'))"); + + stmt.addBatch("UPDATE Person SET id = 3, firstName = 'Mike', lastName = 'Green', " + + "age = 40, data = RAWTOHEX('Green') WHERE _key = 'p3'"); + + stmt.addBatch("SELECT id FROM Person WHERE _key = 'p1'"); + + return stmt.executeBatch(); + } + } + }, + BatchUpdateException.class, + "Given statement type does not match that declared by JDBC driver"); + + // Check update counts in the exception. + int[] counts = reason.getUpdateCounts(); + + assertEquals(2, counts.length); + assertEquals(1, counts[0]); + assertEquals(0, counts[1]); + } + + /** + * @throws Exception If failed. + */ + public void testClearBatch() throws Exception { + try (Statement stmt = conn.createStatement()) { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws SQLException { + return stmt.executeBatch(); + } + }, SQLException.class, "Batch is empty"); + + stmt.addBatch("INSERT INTO Person(_key, id, firstName, lastName, age, data) " + + "VALUES ('p1', 0, 'J', 'W', 250, RAWTOHEX('W'))"); + + stmt.clearBatch(); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws SQLException { + return stmt.executeBatch(); + } + }, SQLException.class, "Batch is empty"); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java index 8ae0e90..07b5587 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcUpdateStatementSelfTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.jdbc2; +import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Arrays; import org.apache.ignite.cache.query.SqlFieldsQuery; @@ -47,4 +48,27 @@ public class JdbcUpdateStatementSelfTest extends JdbcAbstractUpdateStatementSelf assertEquals(Arrays.asList(F.asList("John"), F.asList("Jack"), F.asList("Mike")), jcache(0).query(new SqlFieldsQuery("select firstName from Person order by _key")).getAll()); } + + /** + * @throws SQLException If failed. + */ + public void testBatch() throws SQLException { + PreparedStatement ps = conn.prepareStatement("update Person set lastName = concat(firstName, 'son') " + + "where firstName = ?"); + + ps.setString(1, "John"); + + ps.addBatch(); + + ps.setString(1, "Harry"); + + ps.addBatch(); + + int[] res = ps.executeBatch(); + + assertEquals(Arrays.asList(F.asList("Johnson"), F.asList("Black"), F.asList("Green")), + jcache(0).query(new SqlFieldsQuery("select lastName from Person order by _key")).getAll()); + + assertTrue(Arrays.equals(new int[] {1, 0}, res)); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java index cf7ee8f..a20002b 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java @@ -93,9 +93,12 @@ public class IgniteJdbcDriverTestSuite extends TestSuite { suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDefaultNoOpCacheTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcMergeStatementSelfTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcBinaryMarshallerMergeStatementSelfTest.class)); + suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcUpdateStatementSelfTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcInsertStatementSelfTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcBinaryMarshallerInsertStatementSelfTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDeleteStatementSelfTest.class)); + suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStatementBatchingSelfTest.class)); + suite.addTest(new TestSuite(JdbcBlobTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest.class)); http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java new file mode 100644 index 0000000..7b4846c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcBatchUpdateTask.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.jdbc2; + +import java.sql.BatchUpdateException; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteJdbcDriver; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.processors.cache.QueryCursorImpl; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.resources.IgniteInstanceResource; + +import static java.sql.Statement.SUCCESS_NO_INFO; + +/** + * Task for SQL batched update statements execution through {@link IgniteJdbcDriver}. + */ +class JdbcBatchUpdateTask implements IgniteCallable<int[]> { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Ignite. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Cache name. */ + private final String cacheName; + + /** Schema name. */ + private final String schemaName; + + /** SQL command for argument batching. */ + private final String sql; + + /** Batch of statements. */ + private final List<String> sqlBatch; + + /** Batch of arguments. */ + private final List<List<Object>> batchArgs; + + /** Fetch size. */ + private final int fetchSize; + + /** Local execution flag. */ + private final boolean loc; + + /** Local query flag. */ + private final boolean locQry; + + /** Collocated query flag. */ + private final boolean collocatedQry; + + /** Distributed joins flag. */ + private final boolean distributedJoins; + + /** + * @param ignite Ignite. + * @param cacheName Cache name. + * @param schemaName Schema name. + * @param sql SQL query. {@code null} in case of statement batching. + * @param sqlBatch Batch of SQL statements. {@code null} in case of parameter batching. + * @param batchArgs Batch of SQL parameters. {@code null} in case of statement batching. + * @param loc Local execution flag. + * @param fetchSize Fetch size. + * @param locQry Local query flag. + * @param collocatedQry Collocated query flag. + * @param distributedJoins Distributed joins flag. + */ + public JdbcBatchUpdateTask(Ignite ignite, String cacheName, String schemaName, String sql, + List<String> sqlBatch, List<List<Object>> batchArgs, boolean loc, int fetchSize, + boolean locQry, boolean collocatedQry, boolean distributedJoins) { + this.ignite = ignite; + this.cacheName = cacheName; + this.schemaName = schemaName; + this.sql = sql; + this.sqlBatch = sqlBatch; + this.batchArgs = batchArgs; + this.fetchSize = fetchSize; + this.loc = loc; + this.locQry = locQry; + this.collocatedQry = collocatedQry; + this.distributedJoins = distributedJoins; + + assert (!F.isEmpty(sql) && !F.isEmpty(batchArgs)) ^ !F.isEmpty(sqlBatch); + } + + /** {@inheritDoc} */ + @Override public int[] call() throws Exception { + IgniteCache<?, ?> cache = ignite.cache(cacheName); + + // Don't create caches on server nodes in order to avoid of data rebalancing. + boolean start = ignite.configuration().isClientMode(); + + if (cache == null && cacheName == null) + cache = ((IgniteKernal)ignite).context().cache().getOrStartPublicCache(start, !loc && locQry); + + if (cache == null) { + if (cacheName == null) + throw new SQLException("Failed to execute query. No suitable caches found."); + else + throw new SQLException("Cache not found [cacheName=" + cacheName + ']'); + } + + int batchSize = F.isEmpty(sql) ? sqlBatch.size() : batchArgs.size(); + + int[] updCntrs = new int[batchSize]; + + int idx = 0; + + try { + if (F.isEmpty(sql)) { + for (; idx < batchSize; idx++) + updCntrs[idx] = doSingleUpdate(cache, sqlBatch.get(idx), null); + } + else { + for (; idx < batchSize; idx++) + updCntrs[idx] = doSingleUpdate(cache, sql, batchArgs.get(idx)); + } + } + catch (Exception ex) { + throw new BatchUpdateException(Arrays.copyOf(updCntrs, idx), ex); + } + + return updCntrs; + } + + /** + * Performs update. + * + * @param cache Cache. + * @param sqlText SQL text. + * @param args Parameters. + * @return Update counter. + * @throws SQLException If failed. + */ + private Integer doSingleUpdate(IgniteCache<?, ?> cache, String sqlText, List<Object> args) throws SQLException { + SqlFieldsQuery qry = new JdbcSqlFieldsQuery(sqlText, false); + + qry.setPageSize(fetchSize); + qry.setLocal(locQry); + qry.setCollocated(collocatedQry); + qry.setDistributedJoins(distributedJoins); + qry.setSchema(schemaName); + qry.setArgs(args == null ? null : args.toArray()); + + QueryCursorImpl<List<?>> qryCursor = (QueryCursorImpl<List<?>>)cache.withKeepBinary().query(qry); + + if (qryCursor.isQuery()) + throw new SQLException(getError("Query produced result set", qry)); + + List<List<?>> rows = qryCursor.getAll(); + + if (F.isEmpty(rows)) + return SUCCESS_NO_INFO; + + if (rows.size() != 1) + throw new SQLException(getError("Expected single row for update operation result", qry)); + + List<?> row = rows.get(0); + + if (F.isEmpty(row) || row.size() != 1) + throw new SQLException(getError("Expected row size of 1 for update operation", qry)); + + Object objRes = row.get(0); + + if (!(objRes instanceof Long)) + throw new SQLException(getError("Unexpected update result type", qry)); + + Long longRes = (Long)objRes; + + if (longRes > Integer.MAX_VALUE) { + IgniteLogger log = ignite.log(); + + if (log != null) + log.warning(getError("Query updated row counter (" + longRes + ") exceeds integer range", qry)); + + return Integer.MAX_VALUE; + } + + return longRes.intValue(); + } + + /** + * Formats error message with query details. + * + * @param msg Error message. + * @param qry Query. + * @return Result. + */ + private String getError(String msg, SqlFieldsQuery qry) { + return msg + " [qry='" + qry.getSql() + "', params=" + Arrays.deepToString(qry.getArgs()) + ']'; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java index 98a2563..b369b0b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcDatabaseMetadata.java @@ -1063,7 +1063,7 @@ public class JdbcDatabaseMetadata implements DatabaseMetaData { /** {@inheritDoc} */ @Override public boolean supportsBatchUpdates() throws SQLException { - return false; + return true; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java index 16030f7..38dfe02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java @@ -39,6 +39,7 @@ import java.sql.Time; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Calendar; +import java.util.List; /** * JDBC prepared statement implementation. @@ -50,6 +51,9 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat /** H2's parsed statement to retrieve metadata from. */ PreparedStatement nativeStatement; + /** Batch arguments. */ + private List<List<Object>> batchArgs; + /** * Creates new prepared statement. * @@ -66,7 +70,8 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat @Override public void addBatch(String sql) throws SQLException { ensureNotClosed(); - throw new SQLFeatureNotSupportedException("Adding new SQL command to batch not supported for prepared statement."); + throw new SQLFeatureNotSupportedException("Adding new SQL command to batch is not supported for prepared " + + "statement (use addBatch() to add new set of arguments)"); } /** {@inheritDoc} */ @@ -185,7 +190,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat @Override public void clearBatch() throws SQLException { ensureNotClosed(); - throw new SQLFeatureNotSupportedException("Batch statements are not supported yet."); + batchArgs = null; } /** {@inheritDoc} */ @@ -207,14 +212,26 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat @Override public void addBatch() throws SQLException { ensureNotClosed(); - throw new SQLFeatureNotSupportedException("Batch statements are not supported yet."); + if (batchArgs == null) + batchArgs = new ArrayList<>(); + + batchArgs.add(args); + + args = null; } /** {@inheritDoc} */ @Override public int[] executeBatch() throws SQLException { - throw new SQLFeatureNotSupportedException("Batch statements are not supported yet."); + ensureNotClosed(); + + List<List<Object>> batchArgs = this.batchArgs; + + this.batchArgs = null; + + return doBatchUpdate(sql, null, batchArgs); } + /** {@inheritDoc} */ @Override public void setCharacterStream(int paramIdx, Reader x, int len) throws SQLException { ensureNotClosed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7781823d/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java index 89a80ca..19c20a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java @@ -74,7 +74,7 @@ public class JdbcStatement implements Statement { /** Current updated items count. */ long updateCnt = -1; - /** Batch statements. */ + /** Batch of statements. */ private List<String> batch; /** @@ -187,7 +187,7 @@ public class JdbcStatement implements Statement { /** * @param rows query result. - * @return update counter, if found + * @return update counter, if found. * @throws SQLException if getting an update counter from result proved to be impossible. */ private static long updateCounterFromQueryResult(List<List<?>> rows) throws SQLException { @@ -461,7 +461,60 @@ public class JdbcStatement implements Statement { @Override public int[] executeBatch() throws SQLException { ensureNotClosed(); - throw new SQLFeatureNotSupportedException("Batch statements are not supported yet."); + List<String> batch = this.batch; + + this.batch = null; + + return doBatchUpdate(null, batch, null); + } + + /** + * Runs batch of update commands. + * + * @param command SQL command. + * @param batch Batch of SQL commands. + * @param batchArgs Batch of SQL parameters. + * @return Number of affected rows. + * @throws SQLException If failed. + */ + protected int[] doBatchUpdate(String command, List<String> batch, List<List<Object>> batchArgs) + throws SQLException { + rs = null; + + updateCnt = -1; + + if ((F.isEmpty(command) || F.isEmpty(batchArgs)) && F.isEmpty(batch)) + throw new SQLException("Batch is empty."); + + Ignite ignite = conn.ignite(); + + UUID nodeId = conn.nodeId(); + + boolean loc = nodeId == null; + + if (!conn.isDmlSupported()) + throw new SQLException("Failed to query Ignite: DML operations are supported in versions 1.8.0 and newer"); + + JdbcBatchUpdateTask task = new JdbcBatchUpdateTask(loc ? ignite : null, conn.cacheName(), + conn.schemaName(), command, batch, batchArgs, loc, getFetchSize(), conn.isLocalQuery(), + conn.isCollocatedQuery(), conn.isDistributedJoins()); + + try { + int[] res = loc ? task.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(task); + + updateCnt = F.isEmpty(res)? -1 : res[res.length - 1]; + + return res; + } + catch (IgniteSQLException e) { + throw e.toJdbcException(); + } + catch (SQLException e) { + throw e; + } + catch (Exception e) { + throw new SQLException("Failed to query Ignite.", e); + } } /** {@inheritDoc} */
