Repository: calcite Updated Branches: refs/heads/master 48e434263 -> 08c966b98
[CALCITE-1254] Implement Statement#executeLargeBatch() Further work on executeLargeBatch with the help of jhyde: * Remove usage of deprecated prepareAndExecute * saturated cast long to int * do not call deprecated methods internally * make DatabaseMetaData.supportsBatchUpdates return true * JdbcMeta calls executeLargeBatch if underlying DB supports it * clarify javadoc of Meta.closeStatement Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/08c966b9 Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/08c966b9 Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/08c966b9 Branch: refs/heads/master Commit: 08c966b9828f534ba3d8d2ccf052de7d887c70ef Parents: 48e4342 Author: Josh Elser <[email protected]> Authored: Sat May 28 10:26:10 2016 -0700 Committer: Josh Elser <[email protected]> Committed: Tue May 31 18:44:41 2016 -0400 ---------------------------------------------------------------------- .../calcite/avatica/AvaticaConnection.java | 13 ++-- .../avatica/AvaticaDatabaseMetaData.java | 2 +- .../avatica/AvaticaPreparedStatement.java | 7 ++- .../calcite/avatica/AvaticaStatement.java | 20 +++--- .../apache/calcite/avatica/AvaticaUtils.java | 61 +++++++++++++++++++ .../java/org/apache/calcite/avatica/Meta.java | 35 +++++++---- .../apache/calcite/avatica/proto/Responses.java | 64 ++++++++++---------- .../calcite/avatica/remote/LocalService.java | 6 +- .../calcite/avatica/remote/RemoteMeta.java | 9 ++- .../apache/calcite/avatica/remote/Service.java | 11 ++-- avatica/core/src/main/protobuf/responses.proto | 2 +- .../calcite/avatica/AvaticaStatementTest.java | 51 ++++++++++++++++ .../remote/ProtobufSerializationTest.java | 10 ++- .../remote/ProtobufTranslationImplTest.java | 2 +- .../calcite/avatica/test/AvaticaUtilsTest.java | 10 +++ .../apache/calcite/avatica/jdbc/JdbcMeta.java | 11 ++-- 16 files changed, 233 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/08c966b9/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java index 69a60ec..f0524e7 100644 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java @@ -523,15 +523,12 @@ public abstract class AvaticaConnection implements Connection { * @param pstmt The prepared statement. * @return An array of update counts containing one element for each command in the batch. */ - protected int[] executeBatchUpdateInternal(AvaticaPreparedStatement pstmt) throws SQLException { + protected long[] executeBatchUpdateInternal(AvaticaPreparedStatement pstmt) throws SQLException { try { // Get the handle from the statement Meta.StatementHandle handle = pstmt.handle; // Execute it against meta - final Meta.ExecuteBatchResult executeBatchResult = - meta.executeBatch(handle, pstmt.getParameterValueBatch()); - // Send back just the update counts - return executeBatchResult.updateCounts; + return meta.executeBatch(handle, pstmt.getParameterValueBatch()).updateCounts; } catch (Exception e) { throw helper.createException(e.getMessage(), e); } @@ -610,7 +607,11 @@ public abstract class AvaticaConnection implements Connection { } } }; - return meta.prepareAndExecute(statement.handle, sql, maxRowCount, callback); + // The old semantics were that maxRowCount was also treated as the maximum number of + // elements in the first Frame of results. A value of -1 would also preserve this, but an + // explicit (positive) number is easier to follow, IMO. + return meta.prepareAndExecute(statement.handle, sql, maxRowCount, + AvaticaUtils.toSaturatedInt(maxRowCount), callback); } protected ExecuteBatchResult prepareAndUpdateBatch(final AvaticaStatement statement, http://git-wip-us.apache.org/repos/asf/calcite/blob/08c966b9/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java index 0345150..7182968 100644 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java @@ -1100,7 +1100,7 @@ public class AvaticaDatabaseMetaData implements AvaticaSpecificDatabaseMetaData } public boolean supportsBatchUpdates() throws SQLException { - return false; + return true; } public ResultSet getUDTs( http://git-wip-us.apache.org/repos/asf/calcite/blob/08c966b9/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java index e83a785..5e25a03 100644 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java @@ -244,10 +244,13 @@ public abstract class AvaticaPreparedStatement } @Override public int[] executeBatch() throws SQLException { + return AvaticaUtils.toSaturatedInts(executeLargeBatch()); + } + + public long[] executeLargeBatch() throws SQLException { // Overriding the implementation in AvaticaStatement. try { - final int[] updateCounts = getConnection().executeBatchUpdateInternal(this); - return updateCounts; + return getConnection().executeBatchUpdateInternal(this); } finally { // If we failed to send this batch, that's a problem for the user to handle, not us. // Make sure we always clear the statements we collected to submit in one RPC. http://git-wip-us.apache.org/repos/asf/calcite/blob/08c966b9/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java index 82e4443..73c16cd 100644 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java @@ -155,13 +155,12 @@ public abstract class AvaticaStatement /** * Executes a collection of updates in a single batch RPC. * - * @return an array of integers mapping to the update count per SQL command. + * @return an array of long mapping to the update count per SQL command. */ - protected int[] executeBatchInternal() throws SQLException { + protected long[] executeBatchInternal() throws SQLException { for (int i = 0; i < connection.maxRetriesPerExecute; i++) { try { - Meta.ExecuteBatchResult result = connection.prepareAndUpdateBatch(this, batchedSql); - return result.updateCounts; + return connection.prepareAndUpdateBatch(this, batchedSql).updateCounts; } catch (NoSuchStatementException e) { resetStatement(); } @@ -219,7 +218,7 @@ public abstract class AvaticaStatement } public final int executeUpdate(String sql) throws SQLException { - return (int) executeLargeUpdate(sql); + return AvaticaUtils.toSaturatedInt(executeLargeUpdate(sql)); } public long executeLargeUpdate(String sql) throws SQLException { @@ -266,7 +265,7 @@ public abstract class AvaticaStatement } public final int getMaxRows() { - return (int) getLargeMaxRows(); + return AvaticaUtils.toSaturatedInt(getLargeMaxRows()); } public long getLargeMaxRows() { @@ -346,7 +345,7 @@ public abstract class AvaticaStatement } public int getUpdateCount() throws SQLException { - return (int) updateCount; + return AvaticaUtils.toSaturatedInt(updateCount); } public long getLargeUpdateCount() throws SQLException { @@ -390,12 +389,16 @@ public abstract class AvaticaStatement } public int[] executeBatch() throws SQLException { + return AvaticaUtils.toSaturatedInts(executeLargeBatch()); + } + + public long[] executeLargeBatch() throws SQLException { try { return executeBatchInternal(); } finally { // If we failed to send this batch, that's a problem for the user to handle, not us. // Make sure we always clear the statements we collected to submit in one RPC. - this.batchedSql.clear(); + clearBatch(); } } @@ -551,6 +554,7 @@ public abstract class AvaticaStatement } return parameterValues; } + } // End AvaticaStatement.java http://git-wip-us.apache.org/repos/asf/calcite/blob/08c966b9/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java index a999f19..845dde5 100644 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java @@ -25,6 +25,7 @@ import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodType; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; import java.util.AbstractList; @@ -44,6 +45,8 @@ public class AvaticaUtils { method(long.class, Statement.class, "getLargeMaxRows"); private static final MethodHandle GET_LARGE_UPDATE_COUNT = method(void.class, Statement.class, "getLargeUpdateCount"); + private static final MethodHandle EXECUTE_LARGE_BATCH = + method(long[].class, Statement.class, "executeLargeBatch"); private static final Set<String> UNIQUE_STRINGS = new HashSet<>(); @@ -311,6 +314,26 @@ public class AvaticaUtils { return statement.getUpdateCount(); } + /** Invokes {@code Statement#executeLargeBatch}, falling back on + * {@link PreparedStatement#executeBatch} if the method does not exist + * (before JDK 1.8) or throws {@link UnsupportedOperationException}. */ + public static long[] executeLargeBatch(Statement statement) + throws SQLException { + if (EXECUTE_LARGE_BATCH != null) { + try { + // Call Statement.executeLargeBatch + return (long[]) EXECUTE_LARGE_BATCH.invokeExact(); + } catch (UnsupportedOperationException e) { + // ignore, and fall through to call Statement.executeBatch + } catch (Error | RuntimeException | SQLException e) { + throw e; + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + return toLongs(statement.executeBatch()); + } + /** Generates a string that is unique in the execution of the JVM. * It is used by tests to ensure that they create distinct temporary tables. * The strings are never thrown away, so don't put too much in there! @@ -324,6 +347,44 @@ public class AvaticaUtils { return s; } } + + /** Converts a {@code long} to {@code int}, rounding as little as possible + * if the value is outside the legal range for an {@code int}. */ + public static int toSaturatedInt(long value) { + if (value > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + if (value < Integer.MIN_VALUE) { + return Integer.MIN_VALUE; + } + return (int) value; + } + + /** + * Converts an array of {@code long} values to an array of {@code int} + * values, truncating values outside the legal range for an {@code int} + * to {@link Integer#MIN_VALUE} or {@link Integer#MAX_VALUE}. + * + * @param longs An array of {@code long}s + * @return An array of {@code int}s + */ + public static int[] toSaturatedInts(long[] longs) { + final int[] ints = new int[longs.length]; + for (int i = 0; i < longs.length; i++) { + ints[i] = toSaturatedInt(longs[i]); + } + return ints; + } + + /** Converts an array of {@code int} values to an array of {@code long} + * values. */ + public static long[] toLongs(int[] ints) { + final long[] longs = new long[ints.length]; + for (int i = 0; i < ints.length; i++) { + longs[i] = ints[i]; + } + return longs; + } } // End AvaticaUtils.java http://git-wip-us.apache.org/repos/asf/calcite/blob/08c966b9/avatica/core/src/main/java/org/apache/calcite/avatica/Meta.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/Meta.java b/avatica/core/src/main/java/org/apache/calcite/avatica/Meta.java index 829ab5a..c7df281 100644 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/Meta.java +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/Meta.java @@ -225,7 +225,8 @@ public interface Meta { * first frame of data * @deprecated See {@link #prepareAndExecute(StatementHandle, String, long, int, PrepareCallback)} */ - @Deprecated ExecuteResult prepareAndExecute(StatementHandle h, String sql, + @Deprecated // to be removed before 2.0 + ExecuteResult prepareAndExecute(StatementHandle h, String sql, long maxRowCount, PrepareCallback callback) throws NoSuchStatementException; /** Prepares and executes a statement. @@ -234,7 +235,9 @@ public interface Meta { * @param sql SQL query * @param maxRowCount Maximum number of rows for the entire query. Negative for no limit * (different meaning than JDBC). - * @param maxRowsInFirstFrame Maximum number of rows for the first frame. + * @param maxRowsInFirstFrame Maximum number of rows for the first frame. This value should + * always be less than or equal to {@code maxRowCount} as the number of results are guaranteed + * to be restricted by {@code maxRowCount} and the underlying database. * @param callback Callback to lock, clear and assign cursor * * @return Result containing statement ID, and if a query, a result set and @@ -288,7 +291,8 @@ public interface Meta { * @return Execute result * @deprecated See {@link #execute(StatementHandle, List, int)} */ - @Deprecated ExecuteResult execute(StatementHandle h, List<TypedValue> parameterValues, + @Deprecated // to be removed before 2.0 + ExecuteResult execute(StatementHandle h, List<TypedValue> parameterValues, long maxRowCount) throws NoSuchStatementException; /** Executes a prepared statement. @@ -307,7 +311,13 @@ public interface Meta { */ StatementHandle createStatement(ConnectionHandle ch); - /** Closes a statement. */ + /** Closes a statement. + * + * <p>If the statement handle is not known, or is already closed, does + * nothing. + * + * @param h Statement handle + */ void closeStatement(StatementHandle h); /** @@ -326,29 +336,30 @@ public interface Meta { void closeConnection(ConnectionHandle ch); /** - * Re-set the {@link ResultSet} on a Statement. Not a JDBC method. + * Re-sets the {@link ResultSet} on a Statement. Not a JDBC method. + * * @return True if there are results to fetch after resetting to the given offset. False otherwise */ boolean syncResults(StatementHandle sh, QueryState state, long offset) throws NoSuchStatementException; /** - * Makes all changes since the last commit/rollback permanent. Analogy to + * Makes all changes since the last commit/rollback permanent. Analogous to * {@link Connection#commit()}. * - * @param ch A reference to the real JDBC Connection. + * @param ch A reference to the real JDBC Connection */ void commit(ConnectionHandle ch); /** - * Undoes all changes since the last commit/rollback. Analogy to + * Undoes all changes since the last commit/rollback. Analogous to * {@link Connection#rollback()}; * - * @param ch A reference to the real JDBC Connection. + * @param ch A reference to the real JDBC Connection */ void rollback(ConnectionHandle ch); - /** Sync client and server view of connection properties. + /** Synchronizes client and server view of connection properties. * * <p>Note: this interface is considered "experimental" and may undergo further changes as this * functionality is extended to other aspects of state management for @@ -522,9 +533,9 @@ public interface Meta { * Response from a collection of SQL commands or parameter values in a single batch. */ class ExecuteBatchResult { - public final int[] updateCounts; + public final long[] updateCounts; - public ExecuteBatchResult(int[] updateCounts) { + public ExecuteBatchResult(long[] updateCounts) { this.updateCounts = Objects.requireNonNull(updateCounts); } } http://git-wip-us.apache.org/repos/asf/calcite/blob/08c966b9/avatica/core/src/main/java/org/apache/calcite/avatica/proto/Responses.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/proto/Responses.java b/avatica/core/src/main/java/org/apache/calcite/avatica/proto/Responses.java index ca24f86..9070675 100644 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/proto/Responses.java +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/proto/Responses.java @@ -11627,17 +11627,17 @@ package org.apache.calcite.avatica.proto; int getStatementId(); /** - * <code>repeated uint32 update_counts = 3;</code> + * <code>repeated uint64 update_counts = 3;</code> */ - java.util.List<java.lang.Integer> getUpdateCountsList(); + java.util.List<java.lang.Long> getUpdateCountsList(); /** - * <code>repeated uint32 update_counts = 3;</code> + * <code>repeated uint64 update_counts = 3;</code> */ int getUpdateCountsCount(); /** - * <code>repeated uint32 update_counts = 3;</code> + * <code>repeated uint64 update_counts = 3;</code> */ - int getUpdateCounts(int index); + long getUpdateCounts(int index); /** * <code>optional bool missing_statement = 4;</code> @@ -11720,21 +11720,21 @@ package org.apache.calcite.avatica.proto; } case 24: { if (!((mutable_bitField0_ & 0x00000004) == 0x00000004)) { - updateCounts_ = new java.util.ArrayList<java.lang.Integer>(); + updateCounts_ = new java.util.ArrayList<java.lang.Long>(); mutable_bitField0_ |= 0x00000004; } - updateCounts_.add(input.readUInt32()); + updateCounts_.add(input.readUInt64()); break; } case 26: { int length = input.readRawVarint32(); int limit = input.pushLimit(length); if (!((mutable_bitField0_ & 0x00000004) == 0x00000004) && input.getBytesUntilLimit() > 0) { - updateCounts_ = new java.util.ArrayList<java.lang.Integer>(); + updateCounts_ = new java.util.ArrayList<java.lang.Long>(); mutable_bitField0_ |= 0x00000004; } while (input.getBytesUntilLimit() > 0) { - updateCounts_.add(input.readUInt32()); + updateCounts_.add(input.readUInt64()); } input.popLimit(limit); break; @@ -11829,24 +11829,24 @@ package org.apache.calcite.avatica.proto; } public static final int UPDATE_COUNTS_FIELD_NUMBER = 3; - private java.util.List<java.lang.Integer> updateCounts_; + private java.util.List<java.lang.Long> updateCounts_; /** - * <code>repeated uint32 update_counts = 3;</code> + * <code>repeated uint64 update_counts = 3;</code> */ - public java.util.List<java.lang.Integer> + public java.util.List<java.lang.Long> getUpdateCountsList() { return updateCounts_; } /** - * <code>repeated uint32 update_counts = 3;</code> + * <code>repeated uint64 update_counts = 3;</code> */ public int getUpdateCountsCount() { return updateCounts_.size(); } /** - * <code>repeated uint32 update_counts = 3;</code> + * <code>repeated uint64 update_counts = 3;</code> */ - public int getUpdateCounts(int index) { + public long getUpdateCounts(int index) { return updateCounts_.get(index); } private int updateCountsMemoizedSerializedSize = -1; @@ -11909,7 +11909,7 @@ package org.apache.calcite.avatica.proto; output.writeRawVarint32(updateCountsMemoizedSerializedSize); } for (int i = 0; i < updateCounts_.size(); i++) { - output.writeUInt32NoTag(updateCounts_.get(i)); + output.writeUInt64NoTag(updateCounts_.get(i)); } if (missingStatement_ != false) { output.writeBool(4, missingStatement_); @@ -11935,7 +11935,7 @@ package org.apache.calcite.avatica.proto; int dataSize = 0; for (int i = 0; i < updateCounts_.size(); i++) { dataSize += com.google.protobuf.CodedOutputStream - .computeUInt32SizeNoTag(updateCounts_.get(i)); + .computeUInt64SizeNoTag(updateCounts_.get(i)); } size += dataSize; if (!getUpdateCountsList().isEmpty()) { @@ -12280,56 +12280,56 @@ package org.apache.calcite.avatica.proto; return this; } - private java.util.List<java.lang.Integer> updateCounts_ = java.util.Collections.emptyList(); + private java.util.List<java.lang.Long> updateCounts_ = java.util.Collections.emptyList(); private void ensureUpdateCountsIsMutable() { if (!((bitField0_ & 0x00000004) == 0x00000004)) { - updateCounts_ = new java.util.ArrayList<java.lang.Integer>(updateCounts_); + updateCounts_ = new java.util.ArrayList<java.lang.Long>(updateCounts_); bitField0_ |= 0x00000004; } } /** - * <code>repeated uint32 update_counts = 3;</code> + * <code>repeated uint64 update_counts = 3;</code> */ - public java.util.List<java.lang.Integer> + public java.util.List<java.lang.Long> getUpdateCountsList() { return java.util.Collections.unmodifiableList(updateCounts_); } /** - * <code>repeated uint32 update_counts = 3;</code> + * <code>repeated uint64 update_counts = 3;</code> */ public int getUpdateCountsCount() { return updateCounts_.size(); } /** - * <code>repeated uint32 update_counts = 3;</code> + * <code>repeated uint64 update_counts = 3;</code> */ - public int getUpdateCounts(int index) { + public long getUpdateCounts(int index) { return updateCounts_.get(index); } /** - * <code>repeated uint32 update_counts = 3;</code> + * <code>repeated uint64 update_counts = 3;</code> */ public Builder setUpdateCounts( - int index, int value) { + int index, long value) { ensureUpdateCountsIsMutable(); updateCounts_.set(index, value); onChanged(); return this; } /** - * <code>repeated uint32 update_counts = 3;</code> + * <code>repeated uint64 update_counts = 3;</code> */ - public Builder addUpdateCounts(int value) { + public Builder addUpdateCounts(long value) { ensureUpdateCountsIsMutable(); updateCounts_.add(value); onChanged(); return this; } /** - * <code>repeated uint32 update_counts = 3;</code> + * <code>repeated uint64 update_counts = 3;</code> */ public Builder addAllUpdateCounts( - java.lang.Iterable<? extends java.lang.Integer> values) { + java.lang.Iterable<? extends java.lang.Long> values) { ensureUpdateCountsIsMutable(); com.google.protobuf.AbstractMessageLite.Builder.addAll( values, updateCounts_); @@ -12337,7 +12337,7 @@ package org.apache.calcite.avatica.proto; return this; } /** - * <code>repeated uint32 update_counts = 3;</code> + * <code>repeated uint64 update_counts = 3;</code> */ public Builder clearUpdateCounts() { updateCounts_ = java.util.Collections.emptyList(); @@ -12692,7 +12692,7 @@ package org.apache.calcite.avatica.proto; "ress\030\001 \001(\t\"\020\n\016CommitResponse\"\022\n\020Rollback" + "Response\"\225\001\n\024ExecuteBatchResponse\022\025\n\rcon" + "nection_id\030\001 \001(\t\022\024\n\014statement_id\030\002 \001(\r\022\025", - "\n\rupdate_counts\030\003 \003(\r\022\031\n\021missing_stateme" + + "\n\rupdate_counts\030\003 \003(\004\022\031\n\021missing_stateme" + "nt\030\004 \001(\010\022\036\n\010metadata\030\005 \001(\0132\014.RpcMetadata" + "B\"\n org.apache.calcite.avatica.protob\006pr" + "oto3" http://git-wip-us.apache.org/repos/asf/calcite/blob/08c966b9/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java index a15d55f..1562446 100644 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java @@ -16,8 +16,8 @@ */ package org.apache.calcite.avatica.remote; +import org.apache.calcite.avatica.AvaticaUtils; import org.apache.calcite.avatica.Meta; - import org.apache.calcite.avatica.Meta.ExecuteBatchResult; import org.apache.calcite.avatica.MetaImpl; import org.apache.calcite.avatica.MissingResultsException; @@ -218,7 +218,7 @@ public class LocalService implements Service { try { final Meta.ExecuteResult executeResult = meta.prepareAndExecute(sh, request.sql, request.maxRowCount, - new Meta.PrepareCallback() { + request.maxRowsInFirstFrame, new Meta.PrepareCallback() { @Override public Object getMonitor() { return LocalService.class; } @@ -266,7 +266,7 @@ public class LocalService implements Service { try (final Context ctx = executeTimer.start()) { try { final Meta.ExecuteResult executeResult = meta.execute(request.statementHandle, - request.parameterValues, request.maxRowCount); + request.parameterValues, AvaticaUtils.toSaturatedInt(request.maxRowCount)); final List<ResultSetResponse> results = new ArrayList<>(executeResult.resultSets.size()); for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) { http://git-wip-us.apache.org/repos/asf/calcite/blob/08c966b9/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java index de6419a..7ee2226 100644 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java @@ -19,6 +19,7 @@ package org.apache.calcite.avatica.remote; import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaConnection.CallableWithoutException; import org.apache.calcite.avatica.AvaticaParameter; +import org.apache.calcite.avatica.AvaticaUtils; import org.apache.calcite.avatica.ColumnMetaData; import org.apache.calcite.avatica.ConnectionPropertiesImpl; import org.apache.calcite.avatica.Meta; @@ -245,7 +246,11 @@ class RemoteMeta extends MetaImpl { @Override public ExecuteResult prepareAndExecute(StatementHandle h, String sql, long maxRowCount, PrepareCallback callback) throws NoSuchStatementException { - return prepareAndExecute(h, sql, maxRowCount, (int) maxRowCount, callback); + // The old semantics were that maxRowCount was also treated as the maximum number of + // elements in the first Frame of results. A value of -1 would also preserve this, but an + // explicit (positive) number is easier to follow, IMO. + return prepareAndExecute(h, sql, maxRowCount, AvaticaUtils.toSaturatedInt(maxRowCount), + callback); } @Override public ExecuteResult prepareAndExecute(final StatementHandle h, final String sql, @@ -324,7 +329,7 @@ class RemoteMeta extends MetaImpl { @Override public ExecuteResult execute(StatementHandle h, List<TypedValue> parameterValues, long maxRowCount) throws NoSuchStatementException { - return execute(h, parameterValues, (int) maxRowCount); + return execute(h, parameterValues, AvaticaUtils.toSaturatedInt(maxRowCount)); } @Override public ExecuteResult execute(final StatementHandle h, http://git-wip-us.apache.org/repos/asf/calcite/blob/08c966b9/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Service.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Service.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Service.java index b3accd9..affed00 100644 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Service.java +++ b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/Service.java @@ -19,6 +19,7 @@ package org.apache.calcite.avatica.remote; import org.apache.calcite.avatica.AvaticaClientRuntimeException; import org.apache.calcite.avatica.AvaticaConnection; import org.apache.calcite.avatica.AvaticaSeverity; +import org.apache.calcite.avatica.AvaticaUtils; import org.apache.calcite.avatica.BuiltInConnectionProperty; import org.apache.calcite.avatica.ConnectionPropertiesImpl; import org.apache.calcite.avatica.Meta; @@ -904,7 +905,7 @@ public interface Service { public PrepareAndExecuteRequest(String connectionId, int statementId, String sql, long maxRowCount) { - this(connectionId, statementId, sql, maxRowCount, (int) maxRowCount); + this(connectionId, statementId, sql, maxRowCount, AvaticaUtils.toSaturatedInt(maxRowCount)); } @JsonCreator @@ -3028,7 +3029,7 @@ public interface Service { public final String connectionId; public final int statementId; - public final int[] updateCounts; + public final long[] updateCounts; public final boolean missingStatement; public final RpcMetadataResponse rpcMetadata; @@ -3043,7 +3044,7 @@ public interface Service { @JsonCreator public ExecuteBatchResponse(@JsonProperty("connectionId") String connectionId, @JsonProperty("statementId") int statementId, - @JsonProperty("updateCounts") int[] updateCounts, + @JsonProperty("updateCounts") long[] updateCounts, @JsonProperty("missingStatement") boolean missingStatement, @JsonProperty("rpcMetadata") RpcMetadataResponse rpcMetadata) { this.connectionId = connectionId; @@ -3075,9 +3076,9 @@ public interface Service { Responses.ExecuteBatchResponse msg = ProtobufService.castProtobufMessage(genericMsg, Responses.ExecuteBatchResponse.class); - int[] updateCounts = new int[msg.getUpdateCountsCount()]; + long[] updateCounts = new long[msg.getUpdateCountsCount()]; int i = 0; - for (Integer updateCount : msg.getUpdateCountsList()) { + for (Long updateCount : msg.getUpdateCountsList()) { updateCounts[i++] = updateCount; } http://git-wip-us.apache.org/repos/asf/calcite/blob/08c966b9/avatica/core/src/main/protobuf/responses.proto ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/protobuf/responses.proto b/avatica/core/src/main/protobuf/responses.proto index 47d73ab..a3cd3d2 100644 --- a/avatica/core/src/main/protobuf/responses.proto +++ b/avatica/core/src/main/protobuf/responses.proto @@ -129,7 +129,7 @@ message RollbackResponse { message ExecuteBatchResponse { string connection_id = 1; uint32 statement_id = 2; - repeated uint32 update_counts = 3; + repeated uint64 update_counts = 3; bool missing_statement = 4; // Did the request fail because of no-cached statement RpcMetadata metadata = 5; } http://git-wip-us.apache.org/repos/asf/calcite/blob/08c966b9/avatica/core/src/test/java/org/apache/calcite/avatica/AvaticaStatementTest.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/test/java/org/apache/calcite/avatica/AvaticaStatementTest.java b/avatica/core/src/test/java/org/apache/calcite/avatica/AvaticaStatementTest.java new file mode 100644 index 0000000..5f6b56a --- /dev/null +++ b/avatica/core/src/test/java/org/apache/calcite/avatica/AvaticaStatementTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica; + +import org.junit.Before; +import org.junit.Test; + +import java.sql.SQLException; + +import static org.junit.Assert.assertArrayEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test class for AvaticaStatement + */ +public class AvaticaStatementTest { + + private AvaticaStatement statement; + + @Before public void setup() { + statement = mock(AvaticaStatement.class); + } + + @Test public void testUpdateCounts() throws SQLException { + long[] longValues = new long[] {-1, -3, 1, 5, ((long) Integer.MAX_VALUE) + 1}; + int[] intValues = new int[] {-1, -3, 1, 5, Integer.MAX_VALUE}; + when(statement.executeBatch()).thenCallRealMethod(); + when(statement.executeLargeBatch()).thenCallRealMethod(); + when(statement.executeBatchInternal()).thenReturn(longValues); + + assertArrayEquals(intValues, statement.executeBatch()); + assertArrayEquals(longValues, statement.executeLargeBatch()); + } +} + +// End AvaticaStatementTest.java http://git-wip-us.apache.org/repos/asf/calcite/blob/08c966b9/avatica/core/src/test/java/org/apache/calcite/avatica/remote/ProtobufSerializationTest.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/test/java/org/apache/calcite/avatica/remote/ProtobufSerializationTest.java b/avatica/core/src/test/java/org/apache/calcite/avatica/remote/ProtobufSerializationTest.java index d439c83..b3a08c0 100644 --- a/avatica/core/src/test/java/org/apache/calcite/avatica/remote/ProtobufSerializationTest.java +++ b/avatica/core/src/test/java/org/apache/calcite/avatica/remote/ProtobufSerializationTest.java @@ -16,6 +16,7 @@ */ package org.apache.calcite.avatica.remote; +import org.apache.calcite.avatica.AvaticaUtils; import org.apache.calcite.avatica.ColumnMetaData.Rep; import org.apache.calcite.avatica.Meta.Signature; import org.apache.calcite.avatica.Meta.StatementHandle; @@ -130,7 +131,8 @@ public class ProtobufSerializationTest { prepareAndExecuteReq.serialize(); assertEquals(maxRowCount, prepareAndExecuteProtoReq.getMaxRowCount()); assertEquals(maxRowCount, prepareAndExecuteProtoReq.getMaxRowsTotal()); - assertEquals((int) maxRowCount, prepareAndExecuteProtoReq.getFirstFrameMaxSize()); + assertEquals(AvaticaUtils.toSaturatedInt(maxRowCount), + prepareAndExecuteProtoReq.getFirstFrameMaxSize()); assertEquals(prepareAndExecuteReq, prepareAndExecuteReq.deserialize(prepareAndExecuteProtoReq)); @@ -175,7 +177,8 @@ public class ProtobufSerializationTest { prepareAndExecuteReq = new Service.PrepareAndExecuteRequest().deserialize(protoPrepare); assertEquals(maxRowCount, prepareAndExecuteReq.maxRowCount); - assertEquals((int) maxRowCount, prepareAndExecuteReq.maxRowsInFirstFrame); + assertEquals(AvaticaUtils.toSaturatedInt(maxRowCount), + prepareAndExecuteReq.maxRowsInFirstFrame); // Both the new and old provided should default to the new (firstFrameMaxSize should be the // the same as what ultimately is set to maxRowCount) @@ -185,7 +188,8 @@ public class ProtobufSerializationTest { prepareAndExecuteReq = new Service.PrepareAndExecuteRequest().deserialize(protoPrepare); assertEquals(maxRowCount, prepareAndExecuteReq.maxRowCount); - assertEquals((int) maxRowCount, prepareAndExecuteReq.maxRowsInFirstFrame); + assertEquals(AvaticaUtils.toSaturatedInt(maxRowCount), + prepareAndExecuteReq.maxRowsInFirstFrame); // Same as previous example, but explicitly setting maxRowsInFirstFrame too protoPrepare = Requests.PrepareAndExecuteRequest.newBuilder(). http://git-wip-us.apache.org/repos/asf/calcite/blob/08c966b9/avatica/core/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java b/avatica/core/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java index 8dac427..5cf3208 100644 --- a/avatica/core/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java +++ b/avatica/core/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java @@ -358,7 +358,7 @@ public class ProtobufTranslationImplTest<T> { responses.add(new CommitResponse()); responses.add(new RollbackResponse()); - int[] updateCounts = new int[]{1, 0, 1, 1}; + long[] updateCounts = new long[]{1, 0, 1, 1}; responses.add( new ExecuteBatchResponse("connectionId", 12345, updateCounts, false, rpcMetadata)); http://git-wip-us.apache.org/repos/asf/calcite/blob/08c966b9/avatica/core/src/test/java/org/apache/calcite/avatica/test/AvaticaUtilsTest.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/test/java/org/apache/calcite/avatica/test/AvaticaUtilsTest.java b/avatica/core/src/test/java/org/apache/calcite/avatica/test/AvaticaUtilsTest.java index 8929400..8905508 100644 --- a/avatica/core/src/test/java/org/apache/calcite/avatica/test/AvaticaUtilsTest.java +++ b/avatica/core/src/test/java/org/apache/calcite/avatica/test/AvaticaUtilsTest.java @@ -30,6 +30,7 @@ import java.util.Properties; import java.util.Set; import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -148,6 +149,15 @@ public class AvaticaUtilsTest { assertThat(env.getString(), is(" ")); } + @Test public void testLongToIntegerTranslation() { + long[] longValues = new long[] {Integer.MIN_VALUE, -5, 0, 1, Integer.MAX_VALUE, + ((long) Integer.MAX_VALUE) + 1L, Long.MAX_VALUE}; + int[] convertedValues = AvaticaUtils.toSaturatedInts(longValues); + int[] intValues = new int[] {Integer.MIN_VALUE, -5, 0, 1, Integer.MAX_VALUE, + Integer.MAX_VALUE, Integer.MAX_VALUE}; + assertArrayEquals(convertedValues, intValues); + } + /** Dummy implementation of {@link ConnectionProperty}. */ private static class ConnectionPropertyImpl implements ConnectionProperty { private final String name; http://git-wip-us.apache.org/repos/asf/calcite/blob/08c966b9/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java ---------------------------------------------------------------------- diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java index c68041f..204421f 100644 --- a/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java +++ b/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java @@ -710,7 +710,8 @@ public class JdbcMeta implements ProtobufMeta { public ExecuteResult prepareAndExecute(StatementHandle h, String sql, long maxRowCount, PrepareCallback callback) throws NoSuchStatementException { - return prepareAndExecute(h, sql, maxRowCount, (int) maxRowCount, callback); + return prepareAndExecute(h, sql, maxRowCount, AvaticaUtils.toSaturatedInt(maxRowCount), + callback); } public ExecuteResult prepareAndExecute(StatementHandle h, String sql, long maxRowCount, @@ -818,7 +819,7 @@ public class JdbcMeta implements ProtobufMeta { @Override public ExecuteResult execute(StatementHandle h, List<TypedValue> parameterValues, long maxRowCount) throws NoSuchStatementException { - return execute(h, parameterValues, (int) maxRowCount); + return execute(h, parameterValues, AvaticaUtils.toSaturatedInt(maxRowCount)); } @Override public ExecuteResult execute(StatementHandle h, @@ -912,7 +913,7 @@ public class JdbcMeta implements ProtobufMeta { } // Execute the batch and return the results - return new ExecuteBatchResult(stmt.executeBatch()); + return new ExecuteBatchResult(AvaticaUtils.executeLargeBatch(stmt)); } catch (SQLException e) { throw propagate(e); } @@ -944,7 +945,7 @@ public class JdbcMeta implements ProtobufMeta { } preparedStmt.addBatch(); } - return new ExecuteBatchResult(preparedStmt.executeBatch()); + return new ExecuteBatchResult(AvaticaUtils.executeLargeBatch(preparedStmt)); } catch (SQLException e) { throw propagate(e); } @@ -967,7 +968,7 @@ public class JdbcMeta implements ProtobufMeta { } preparedStmt.addBatch(); } - return new ExecuteBatchResult(preparedStmt.executeBatch()); + return new ExecuteBatchResult(AvaticaUtils.executeLargeBatch(preparedStmt)); } catch (SQLException e) { throw propagate(e); }
