[CALCITE-1128] Implement JDBC batch update methods in remote driver This commit provides an implementation for:
* Statement.addBatch(String) * PreparedStatement.addBatch() * PreparedStatement.executeBatch() The implementation is fairly straightforward except for the addition of a new server interface: ProtobufMeta. This is a new interface which the Meta implementation can choose to also implement to provide a "native" implementation on top of Protobuf objects instead of the Avatica POJOs. During the investigations Avatica performance pre-1.7.0, it was found that converting protobufs to POJOs was a very hot code path. This short-circuit helps us avoid extra objects on the heap and computation to create them in what should be a very hot code path for write-workloads. Closes apache/calcite#209 Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/5dfa3f1e Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/5dfa3f1e Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/5dfa3f1e Branch: refs/heads/master Commit: 5dfa3f1ece54d2f95057c5b5097dc0f7fae693ee Parents: 857c06b Author: Josh Elser <[email protected]> Authored: Thu Mar 3 17:25:56 2016 -0500 Committer: Josh Elser <[email protected]> Committed: Thu Mar 24 00:25:32 2016 -0400 ---------------------------------------------------------------------- .../calcite/avatica/AvaticaConnection.java | 25 + .../avatica/AvaticaPreparedStatement.java | 37 +- .../calcite/avatica/AvaticaStatement.java | 33 +- .../java/org/apache/calcite/avatica/Meta.java | 29 + .../apache/calcite/avatica/proto/Requests.java | 2418 +++++++++++++++++- .../apache/calcite/avatica/proto/Responses.java | 970 ++++++- .../calcite/avatica/remote/JsonService.java | 16 + .../calcite/avatica/remote/LocalService.java | 32 + .../calcite/avatica/remote/ProtobufMeta.java | 45 + .../calcite/avatica/remote/ProtobufService.java | 8 + .../avatica/remote/ProtobufTranslationImpl.java | 15 + .../calcite/avatica/remote/RemoteMeta.java | 23 + .../apache/calcite/avatica/remote/Service.java | 276 +- .../calcite/avatica/remote/TypedValue.java | 145 +- avatica/core/src/main/protobuf/requests.proto | 18 + avatica/core/src/main/protobuf/responses.proto | 9 + .../avatica/remote/ExecuteBatchRequestTest.java | 79 + .../remote/ProtobufTranslationImplTest.java | 11 + .../calcite/avatica/test/JsonHandlerTest.java | 8 + .../apache/calcite/avatica/jdbc/JdbcMeta.java | 82 +- .../calcite/avatica/RemoteDriverTest.java | 398 +++ avatica/site/_docs/json_reference.md | 69 + avatica/site/_docs/protobuf_reference.md | 76 + 23 files changed, 4775 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/5dfa3f1e/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java index 2d89f45..091fe6b 100644 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java @@ -16,6 +16,7 @@ */ package org.apache.calcite.avatica; +import org.apache.calcite.avatica.Meta.ExecuteBatchResult; import org.apache.calcite.avatica.Meta.MetaResultSet; import org.apache.calcite.avatica.remote.Service.ErrorResponse; import org.apache.calcite.avatica.remote.Service.OpenConnectionRequest; @@ -505,6 +506,25 @@ public abstract class AvaticaConnection implements Connection { return statement.openResultSet; } + /** Executes a batch update using an {@link AvaticaPreparedStatement}. + * + * @param pstmt The prepared statement. + * @return An array of update counts containing one element for each command in the batch. + */ + protected int[] executeBatchUpdateInternal(AvaticaPreparedStatement pstmt) throws SQLException { + try { + // Get the handle from the statement + Meta.StatementHandle handle = pstmt.handle; + // Execute it against meta + final Meta.ExecuteBatchResult executeBatchResult = + meta.executeBatch(handle, pstmt.getParameterValueBatch()); + // Send back just the update counts + return executeBatchResult.updateCounts; + } catch (Exception e) { + throw helper.createException(e.getMessage(), e); + } + } + /** Returns whether a a statement is capable of updates and if so, * and the statement's {@code updateCount} is still -1, proceeds to * get updateCount value from statement's resultSet. @@ -581,6 +601,11 @@ public abstract class AvaticaConnection implements Connection { return meta.prepareAndExecute(statement.handle, sql, maxRowCount, callback); } + protected ExecuteBatchResult prepareAndUpdateBatch(final AvaticaStatement statement, + final List<String> queries) throws NoSuchStatementException, SQLException { + return meta.prepareAndExecuteBatch(statement.handle, queries); + } + protected ResultSet createResultSet(Meta.MetaResultSet metaResultSet, QueryState state) throws SQLException { final Meta.StatementHandle h = new Meta.StatementHandle( http://git-wip-us.apache.org/repos/asf/calcite/blob/5dfa3f1e/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java index f3a950a..9b8a292 100644 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java @@ -35,6 +35,7 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Time; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; import java.util.List; @@ -52,6 +53,7 @@ public abstract class AvaticaPreparedStatement private final ResultSetMetaData resultSetMetaData; private Calendar calendar; protected final TypedValue[] slots; + protected final List<List<TypedValue>> parameterValueBatch; /** * Creates an AvaticaPreparedStatement. @@ -75,12 +77,27 @@ public abstract class AvaticaPreparedStatement this.slots = new TypedValue[signature.parameters.size()]; this.resultSetMetaData = connection.factory.newResultSetMetaData(this, signature); + this.parameterValueBatch = new ArrayList<>(); } @Override protected List<TypedValue> getParameterValues() { return Arrays.asList(slots); } + /** Returns a copy of the current parameter values. + * @return A copied list of the parameter values + */ + protected List<TypedValue> copyParameterValues() { + // For implementing batch update, we need to make a copy of slots, not just a thin reference + // to it as as list. Otherwise, subsequent setFoo(..) calls will alter the underlying array + // and modify our cached TypedValue list. + List<TypedValue> copy = new ArrayList<>(slots.length); + for (TypedValue value : slots) { + copy.add(value); + } + return copy; + } + /** Returns a calendar in the connection's time zone, creating one the first * time this method is called. * @@ -103,6 +120,10 @@ public abstract class AvaticaPreparedStatement return calendar; } + protected List<List<TypedValue>> getParameterValueBatch() { + return this.parameterValueBatch; + } + // implement PreparedStatement public ResultSet executeQuery() throws SQLException { @@ -213,7 +234,21 @@ public abstract class AvaticaPreparedStatement } public void addBatch() throws SQLException { - throw connection.helper.unsupported(); + // Need to copy the parameterValues into a new list, not wrap the array in a list + // as getParameterValues does. + this.parameterValueBatch.add(copyParameterValues()); + } + + @Override public int[] executeBatch() throws SQLException { + // Overriding the implementation in AvaticaStatement. + try { + final int[] updateCounts = getConnection().executeBatchUpdateInternal(this); + return updateCounts; + } finally { + // If we failed to send this batch, that's a problem for the user to handle, not us. + // Make sure we always clear the statements we collected to submit in one RPC. + this.parameterValueBatch.clear(); + } } public void setCharacterStream(int parameterIndex, Reader reader, int length) http://git-wip-us.apache.org/repos/asf/calcite/blob/5dfa3f1e/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java index cfd1d45..a58fc15 100644 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java @@ -24,6 +24,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLWarning; import java.sql.Statement; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -67,6 +68,8 @@ public abstract class AvaticaStatement private Meta.Signature signature; + private final List<String> batchedSql; + protected void setSignature(Meta.Signature signature) { this.signature = signature; } @@ -109,6 +112,7 @@ public abstract class AvaticaStatement } connection.statementMap.put(h.id, this); this.handle = h; + this.batchedSql = new ArrayList<>(); } /** Returns the identifier of the statement, unique within its connection. */ @@ -148,6 +152,25 @@ public abstract class AvaticaStatement + connection.maxRetriesPerExecute + " attempts."); } + /** + * Executes a collection of updates in a single batch RPC. + * + * @return an array of integers mapping to the update count per SQL command. + */ + protected int[] executeBatchInternal() throws SQLException { + for (int i = 0; i < connection.maxRetriesPerExecute; i++) { + try { + Meta.ExecuteBatchResult result = connection.prepareAndUpdateBatch(this, batchedSql); + return result.updateCounts; + } catch (NoSuchStatementException e) { + resetStatement(); + } + } + + throw new RuntimeException("Failed to successfully execute batch update after " + + connection.maxRetriesPerExecute + " attempts"); + } + protected void resetStatement() { // Invalidate the old statement connection.statementMap.remove(handle.id); @@ -359,7 +382,7 @@ public abstract class AvaticaStatement } public void addBatch(String sql) throws SQLException { - throw connection.helper.unsupported(); + this.batchedSql.add(Objects.requireNonNull(sql)); } public void clearBatch() throws SQLException { @@ -367,7 +390,13 @@ public abstract class AvaticaStatement } public int[] executeBatch() throws SQLException { - throw connection.helper.unsupported(); + try { + return executeBatchInternal(); + } finally { + // If we failed to send this batch, that's a problem for the user to handle, not us. + // Make sure we always clear the statements we collected to submit in one RPC. + this.batchedSql.clear(); + } } public AvaticaConnection getConnection() { http://git-wip-us.apache.org/repos/asf/calcite/blob/5dfa3f1e/avatica/core/src/main/java/org/apache/calcite/avatica/Meta.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/Meta.java b/avatica/core/src/main/java/org/apache/calcite/avatica/Meta.java index 4cc460c..41ca0ee 100644 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/Meta.java +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/Meta.java @@ -228,6 +228,24 @@ public interface Meta { ExecuteResult prepareAndExecute(StatementHandle h, String sql, long maxRowCount, PrepareCallback callback) throws NoSuchStatementException; + /** Prepares a statement and then executes a number of SQL commands in one pass. + * + * @param h Statement handle + * @param sqlCommands SQL commands to run + * @return An array of update counts containing one element for each command in the batch. + */ + ExecuteBatchResult prepareAndExecuteBatch(StatementHandle h, List<String> sqlCommands) + throws NoSuchStatementException; + + /** Executes a collection of bound parameter values on a prepared statement. + * + * @param h Statement handle + * @param parameterValues A collection of list of typed values, one list per batch + * @return An array of update counts containing one element for each command in the batch. + */ + ExecuteBatchResult executeBatch(StatementHandle h, List<List<TypedValue>> parameterValues) + throws NoSuchStatementException; + /** Returns a frame of rows. * * <p>The frame describes whether there may be another frame. If there is not @@ -424,6 +442,17 @@ public interface Meta { } } + /** + * Response from a collection of SQL commands or parameter values in a single batch. + */ + class ExecuteBatchResult { + public final int[] updateCounts; + + public ExecuteBatchResult(int[] updateCounts) { + this.updateCounts = Objects.requireNonNull(updateCounts); + } + } + /** Meta data from which a result set can be constructed. * * <p>If {@code updateCount} is not -1, the result is just a count. A result
