[CALCITE-1128] Implement JDBC batch update methods in remote driver

This commit provides an implementation for:

* Statement.addBatch(String)
* PreparedStatement.addBatch()
* PreparedStatement.executeBatch()

The implementation is fairly straightforward except for the addition
of a new server interface: ProtobufMeta. This is a new interface which
the Meta implementation can choose to also implement to provide a "native"
implementation on top of Protobuf objects instead of the Avatica POJOs.

During the investigations Avatica performance pre-1.7.0, it was found
that converting protobufs to POJOs was a very hot code path. This short-circuit
helps us avoid extra objects on the heap and computation to create them
in what should be a very hot code path for write-workloads.

Closes apache/calcite#209


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/5dfa3f1e
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/5dfa3f1e
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/5dfa3f1e

Branch: refs/heads/master
Commit: 5dfa3f1ece54d2f95057c5b5097dc0f7fae693ee
Parents: 857c06b
Author: Josh Elser <[email protected]>
Authored: Thu Mar 3 17:25:56 2016 -0500
Committer: Josh Elser <[email protected]>
Committed: Thu Mar 24 00:25:32 2016 -0400

----------------------------------------------------------------------
 .../calcite/avatica/AvaticaConnection.java      |   25 +
 .../avatica/AvaticaPreparedStatement.java       |   37 +-
 .../calcite/avatica/AvaticaStatement.java       |   33 +-
 .../java/org/apache/calcite/avatica/Meta.java   |   29 +
 .../apache/calcite/avatica/proto/Requests.java  | 2418 +++++++++++++++++-
 .../apache/calcite/avatica/proto/Responses.java |  970 ++++++-
 .../calcite/avatica/remote/JsonService.java     |   16 +
 .../calcite/avatica/remote/LocalService.java    |   32 +
 .../calcite/avatica/remote/ProtobufMeta.java    |   45 +
 .../calcite/avatica/remote/ProtobufService.java |    8 +
 .../avatica/remote/ProtobufTranslationImpl.java |   15 +
 .../calcite/avatica/remote/RemoteMeta.java      |   23 +
 .../apache/calcite/avatica/remote/Service.java  |  276 +-
 .../calcite/avatica/remote/TypedValue.java      |  145 +-
 avatica/core/src/main/protobuf/requests.proto   |   18 +
 avatica/core/src/main/protobuf/responses.proto  |    9 +
 .../avatica/remote/ExecuteBatchRequestTest.java |   79 +
 .../remote/ProtobufTranslationImplTest.java     |   11 +
 .../calcite/avatica/test/JsonHandlerTest.java   |    8 +
 .../apache/calcite/avatica/jdbc/JdbcMeta.java   |   82 +-
 .../calcite/avatica/RemoteDriverTest.java       |  398 +++
 avatica/site/_docs/json_reference.md            |   69 +
 avatica/site/_docs/protobuf_reference.md        |   76 +
 23 files changed, 4775 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/5dfa3f1e/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
----------------------------------------------------------------------
diff --git 
a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java 
b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
index 2d89f45..091fe6b 100644
--- 
a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
+++ 
b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.avatica;
 
+import org.apache.calcite.avatica.Meta.ExecuteBatchResult;
 import org.apache.calcite.avatica.Meta.MetaResultSet;
 import org.apache.calcite.avatica.remote.Service.ErrorResponse;
 import org.apache.calcite.avatica.remote.Service.OpenConnectionRequest;
@@ -505,6 +506,25 @@ public abstract class AvaticaConnection implements 
Connection {
     return statement.openResultSet;
   }
 
+  /** Executes a batch update using an {@link AvaticaPreparedStatement}.
+   *
+   * @param pstmt The prepared statement.
+   * @return An array of update counts containing one element for each command 
in the batch.
+   */
+  protected int[] executeBatchUpdateInternal(AvaticaPreparedStatement pstmt) 
throws SQLException {
+    try {
+      // Get the handle from the statement
+      Meta.StatementHandle handle = pstmt.handle;
+      // Execute it against meta
+      final Meta.ExecuteBatchResult executeBatchResult =
+          meta.executeBatch(handle, pstmt.getParameterValueBatch());
+      // Send back just the update counts
+      return executeBatchResult.updateCounts;
+    } catch (Exception e) {
+      throw helper.createException(e.getMessage(), e);
+    }
+  }
+
   /** Returns whether a a statement is capable of updates and if so,
    * and the statement's {@code updateCount} is still -1, proceeds to
    * get updateCount value from statement's resultSet.
@@ -581,6 +601,11 @@ public abstract class AvaticaConnection implements 
Connection {
     return meta.prepareAndExecute(statement.handle, sql, maxRowCount, 
callback);
   }
 
+  protected ExecuteBatchResult prepareAndUpdateBatch(final AvaticaStatement 
statement,
+      final List<String> queries) throws NoSuchStatementException, 
SQLException {
+    return meta.prepareAndExecuteBatch(statement.handle, queries);
+  }
+
   protected ResultSet createResultSet(Meta.MetaResultSet metaResultSet, 
QueryState state)
       throws SQLException {
     final Meta.StatementHandle h = new Meta.StatementHandle(

http://git-wip-us.apache.org/repos/asf/calcite/blob/5dfa3f1e/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java
----------------------------------------------------------------------
diff --git 
a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java
 
b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java
index f3a950a..9b8a292 100644
--- 
a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java
+++ 
b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java
@@ -35,6 +35,7 @@ import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.List;
@@ -52,6 +53,7 @@ public abstract class AvaticaPreparedStatement
   private final ResultSetMetaData resultSetMetaData;
   private Calendar calendar;
   protected final TypedValue[] slots;
+  protected final List<List<TypedValue>> parameterValueBatch;
 
   /**
    * Creates an AvaticaPreparedStatement.
@@ -75,12 +77,27 @@ public abstract class AvaticaPreparedStatement
     this.slots = new TypedValue[signature.parameters.size()];
     this.resultSetMetaData =
         connection.factory.newResultSetMetaData(this, signature);
+    this.parameterValueBatch = new ArrayList<>();
   }
 
   @Override protected List<TypedValue> getParameterValues() {
     return Arrays.asList(slots);
   }
 
+  /** Returns a copy of the current parameter values.
+   * @return A copied list of the parameter values
+   */
+  protected List<TypedValue> copyParameterValues() {
+    // For implementing batch update, we need to make a copy of slots, not 
just a thin reference
+    // to it as as list. Otherwise, subsequent setFoo(..) calls will alter the 
underlying array
+    // and modify our cached TypedValue list.
+    List<TypedValue> copy = new ArrayList<>(slots.length);
+    for (TypedValue value : slots) {
+      copy.add(value);
+    }
+    return copy;
+  }
+
   /** Returns a calendar in the connection's time zone, creating one the first
    * time this method is called.
    *
@@ -103,6 +120,10 @@ public abstract class AvaticaPreparedStatement
     return calendar;
   }
 
+  protected List<List<TypedValue>> getParameterValueBatch() {
+    return this.parameterValueBatch;
+  }
+
   // implement PreparedStatement
 
   public ResultSet executeQuery() throws SQLException {
@@ -213,7 +234,21 @@ public abstract class AvaticaPreparedStatement
   }
 
   public void addBatch() throws SQLException {
-    throw connection.helper.unsupported();
+    // Need to copy the parameterValues into a new list, not wrap the array in 
a list
+    // as getParameterValues does.
+    this.parameterValueBatch.add(copyParameterValues());
+  }
+
+  @Override public int[] executeBatch() throws SQLException {
+    // Overriding the implementation in AvaticaStatement.
+    try {
+      final int[] updateCounts = 
getConnection().executeBatchUpdateInternal(this);
+      return updateCounts;
+    } finally {
+      // If we failed to send this batch, that's a problem for the user to 
handle, not us.
+      // Make sure we always clear the statements we collected to submit in 
one RPC.
+      this.parameterValueBatch.clear();
+    }
   }
 
   public void setCharacterStream(int parameterIndex, Reader reader, int length)

http://git-wip-us.apache.org/repos/asf/calcite/blob/5dfa3f1e/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
----------------------------------------------------------------------
diff --git 
a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java 
b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
index cfd1d45..a58fc15 100644
--- 
a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
+++ 
b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
@@ -24,6 +24,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLWarning;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -67,6 +68,8 @@ public abstract class AvaticaStatement
 
   private Meta.Signature signature;
 
+  private final List<String> batchedSql;
+
   protected void setSignature(Meta.Signature signature) {
     this.signature = signature;
   }
@@ -109,6 +112,7 @@ public abstract class AvaticaStatement
     }
     connection.statementMap.put(h.id, this);
     this.handle = h;
+    this.batchedSql = new ArrayList<>();
   }
 
   /** Returns the identifier of the statement, unique within its connection. */
@@ -148,6 +152,25 @@ public abstract class AvaticaStatement
         + connection.maxRetriesPerExecute + " attempts.");
   }
 
+  /**
+   * Executes a collection of updates in a single batch RPC.
+   *
+   * @return an array of integers mapping to the update count per SQL command.
+   */
+  protected int[] executeBatchInternal() throws SQLException {
+    for (int i = 0; i < connection.maxRetriesPerExecute; i++) {
+      try {
+        Meta.ExecuteBatchResult result = 
connection.prepareAndUpdateBatch(this, batchedSql);
+        return result.updateCounts;
+      } catch (NoSuchStatementException e) {
+        resetStatement();
+      }
+    }
+
+    throw new RuntimeException("Failed to successfully execute batch update 
after "
+        +  connection.maxRetriesPerExecute + " attempts");
+  }
+
   protected void resetStatement() {
     // Invalidate the old statement
     connection.statementMap.remove(handle.id);
@@ -359,7 +382,7 @@ public abstract class AvaticaStatement
   }
 
   public void addBatch(String sql) throws SQLException {
-    throw connection.helper.unsupported();
+    this.batchedSql.add(Objects.requireNonNull(sql));
   }
 
   public void clearBatch() throws SQLException {
@@ -367,7 +390,13 @@ public abstract class AvaticaStatement
   }
 
   public int[] executeBatch() throws SQLException {
-    throw connection.helper.unsupported();
+    try {
+      return executeBatchInternal();
+    } finally {
+      // If we failed to send this batch, that's a problem for the user to 
handle, not us.
+      // Make sure we always clear the statements we collected to submit in 
one RPC.
+      this.batchedSql.clear();
+    }
   }
 
   public AvaticaConnection getConnection() {

http://git-wip-us.apache.org/repos/asf/calcite/blob/5dfa3f1e/avatica/core/src/main/java/org/apache/calcite/avatica/Meta.java
----------------------------------------------------------------------
diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/Meta.java 
b/avatica/core/src/main/java/org/apache/calcite/avatica/Meta.java
index 4cc460c..41ca0ee 100644
--- a/avatica/core/src/main/java/org/apache/calcite/avatica/Meta.java
+++ b/avatica/core/src/main/java/org/apache/calcite/avatica/Meta.java
@@ -228,6 +228,24 @@ public interface Meta {
   ExecuteResult prepareAndExecute(StatementHandle h, String sql,
       long maxRowCount, PrepareCallback callback) throws 
NoSuchStatementException;
 
+  /** Prepares a statement and then executes a number of SQL commands in one 
pass.
+   *
+   * @param h Statement handle
+   * @param sqlCommands SQL commands to run
+   * @return An array of update counts containing one element for each command 
in the batch.
+   */
+  ExecuteBatchResult prepareAndExecuteBatch(StatementHandle h, List<String> 
sqlCommands)
+      throws NoSuchStatementException;
+
+  /** Executes a collection of bound parameter values on a prepared statement.
+   *
+   * @param h Statement handle
+   * @param parameterValues A collection of list of typed values, one list per 
batch
+   * @return An array of update counts containing one element for each command 
in the batch.
+   */
+  ExecuteBatchResult executeBatch(StatementHandle h, List<List<TypedValue>> 
parameterValues)
+      throws NoSuchStatementException;
+
   /** Returns a frame of rows.
    *
    * <p>The frame describes whether there may be another frame. If there is not
@@ -424,6 +442,17 @@ public interface Meta {
     }
   }
 
+  /**
+   * Response from a collection of SQL commands or parameter values in a 
single batch.
+   */
+  class ExecuteBatchResult {
+    public final int[] updateCounts;
+
+    public ExecuteBatchResult(int[] updateCounts) {
+      this.updateCounts = Objects.requireNonNull(updateCounts);
+    }
+  }
+
   /** Meta data from which a result set can be constructed.
    *
    * <p>If {@code updateCount} is not -1, the result is just a count. A result

Reply via email to