This is an automated email from the ASF dual-hosted git repository. yanxinyi pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit b7cb29ee55fce92ea804002867ba3e7c68bc1129 Author: Xinyi Yan <[email protected]> AuthorDate: Tue Sep 29 16:11:41 2020 -0700 PHOENIX-6167 Adding maxMutationCellSizeBytes config and exception Signed-off-by: Xinyi Yan <[email protected]> --- .../apache/phoenix/end2end/MutationStateIT.java | 74 ++++++++++++++++++++++ .../org/apache/phoenix/compile/UpsertCompiler.java | 47 ++++++++++++-- .../apache/phoenix/exception/SQLExceptionCode.java | 11 ++++ .../apache/phoenix/exception/SQLExceptionInfo.java | 29 +++++++++ .../org/apache/phoenix/query/QueryServices.java | 1 + .../apache/phoenix/query/QueryServicesOptions.java | 1 + .../MaxPhoenixColumnSizeExceededException.java | 46 ++++++++++++++ 7 files changed, 203 insertions(+), 6 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java index f6a9993..acaa56a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java @@ -18,6 +18,7 @@ package org.apache.phoenix.end2end; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -550,4 +551,77 @@ public class MutationStateIT extends ParallelStatsDisabledIT { assertNotNull(PhoenixRuntime.getTableNoCache(conn, tableName)); } } + + @Test + public void testUpsertMaxColumnAllowanceForSingleCellArrayWithOffsets() throws Exception { + testUpsertColumnExceedsMaxAllowanceSize("SINGLE_CELL_ARRAY_WITH_OFFSETS"); + } + + @Test + public void testUpsertMaxColumnAllowanceForOneCellPerColumn() throws Exception { + testUpsertColumnExceedsMaxAllowanceSize("ONE_CELL_PER_COLUMN"); + } + + public void testUpsertColumnExceedsMaxAllowanceSize(String storageScheme) throws Exception { + Properties connectionProperties = new Properties(); + connectionProperties.setProperty(QueryServices.HBASE_CLIENT_KEYVALUE_MAXSIZE, "20"); + try (PhoenixConnection connection = + (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties)) { + String fullTableName = generateUniqueName(); + String pk1Name = generateUniqueName(); + String pk2Name = generateUniqueName(); + String ddl = "CREATE IMMUTABLE TABLE " + fullTableName + + " (" + pk1Name + " VARCHAR(15) NOT NULL, " + pk2Name + " VARCHAR(15) NOT NULL, " + + "PAYLOAD1 VARCHAR, PAYLOAD2 VARCHAR,PAYLOAD3 VARCHAR " + + "CONSTRAINT PK PRIMARY KEY (" + pk1Name + "," + pk2Name+ ")) " + + "IMMUTABLE_STORAGE_SCHEME =" + storageScheme; + try (Statement stmt = connection.createStatement()) { + stmt.execute(ddl); + } + String sql = "UPSERT INTO " + fullTableName + + " ("+ pk1Name + ","+ pk2Name + ",PAYLOAD1,PAYLOAD2,PAYLOAD2) VALUES (?,?,?,?,?)"; + String pk1Value = generateUniqueName(); + String pk2Value = generateUniqueName(); + String payload1Value = generateUniqueName(); + String payload3Value = generateUniqueName(); + + try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + preparedStatement.setString(1, pk1Value); + preparedStatement.setString(2, pk2Value); + preparedStatement.setString(3, payload1Value); + preparedStatement.setString(4, "1234567890"); + preparedStatement.setString(5, payload3Value); + preparedStatement.execute(); + + try { + preparedStatement.setString(1, pk1Value); + preparedStatement.setString(2, pk2Value); + preparedStatement.setString(3, payload1Value); + preparedStatement.setString(4, "12345678901234567890"); + preparedStatement.setString(5, payload3Value); + preparedStatement.execute(); + if (storageScheme.equals("ONE_CELL_PER_COLUMN")) { + fail(); + } + } catch (SQLException e) { + if (!storageScheme.equals("ONE_CELL_PER_COLUMN")) { + fail(); + } else { + assertEquals(SQLExceptionCode.MAX_HBASE_CLIENT_KEYVALUE_MAXSIZE_EXCEEDED.getErrorCode(), + e.getErrorCode()); + assertTrue(e.getMessage().contains( + SQLExceptionCode.MAX_HBASE_CLIENT_KEYVALUE_MAXSIZE_EXCEEDED.getMessage())); + assertTrue(e.getMessage().contains( + connectionProperties.getProperty(QueryServices.HBASE_CLIENT_KEYVALUE_MAXSIZE))); + assertTrue(e.getMessage().contains(pk1Name)); + assertTrue(e.getMessage().contains(pk2Name)); + assertTrue(e.getMessage().contains(pk1Value)); + assertTrue(e.getMessage().contains(pk2Value)); + assertFalse(e.getMessage().contains(payload1Value)); + assertFalse(e.getMessage().contains(payload3Value)); + } + } + } + } + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index ce28e2c..83c575f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -84,6 +84,7 @@ import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.ConstraintViolationException; import org.apache.phoenix.schema.DelegateColumn; import org.apache.phoenix.schema.IllegalDataException; +import org.apache.phoenix.schema.MaxPhoenixColumnSizeExceededException; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnImpl; @@ -120,9 +121,9 @@ import com.google.common.collect.Sets; public class UpsertCompiler { private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, - PTable table, MultiRowMutationState mutation, - PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer, - byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) throws SQLException { + PTable table, MultiRowMutationState mutation, PhoenixStatement statement, boolean useServerTimestamp, + IndexMaintainer maintainer, byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns, + int maxHBaseClientKeyValueSize) throws SQLException { long columnValueSize = 0; Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length); byte[][] pkValues = new byte[table.getPKColumns().size()][]; @@ -139,6 +140,13 @@ public class UpsertCompiler { for (int i = 0, j = numSplColumns; j < values.length; j++, i++) { byte[] value = values[j]; PColumn column = table.getColumns().get(columnIndexes[i]); + if (value.length >= maxHBaseClientKeyValueSize && + table.getImmutableStorageScheme() == PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN) { + String rowkeyAndColumnInfo = getExceedMaxHBaseClientKeyValueAllowanceRowkeyAndColumnInfo( + values, columnIndexes, table, numSplColumns, column.getName().getString()); + throw new MaxPhoenixColumnSizeExceededException(rowkeyAndColumnInfo, maxHBaseClientKeyValueSize, value.length); + } + if (SchemaUtil.isPKColumn(column)) { pkValues[pkSlotIndex[i]] = value; if (SchemaUtil.getPKPosition(table, column) == table.getRowTimestampColPos()) { @@ -174,7 +182,27 @@ public class UpsertCompiler { } mutation.put(ptr, new RowMutationState(columnValues, columnValueSize, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes)); } - + + public static String getExceedMaxHBaseClientKeyValueAllowanceRowkeyAndColumnInfo( + byte[][] values, int[] columnIndexes, PTable table, int numSplColumns, String columnName) { + StringBuilder sb = new StringBuilder(); + for (int i = 0, j = numSplColumns; j < values.length; j++, i++) { + byte[] value = values[j]; + PColumn column = table.getColumns().get(columnIndexes[i]); + if (SchemaUtil.isPKColumn(column)) { + if (sb.length() != 0) { + sb.append(" AND "); + } + sb.append(column.getName().toString() + "=" + Bytes.toString(value)); + } + } + return String.format("Upsert data to table %s on Column %s exceed max HBase client keyvalue size allowance, " + + "the rowkey is %s", + SchemaUtil.getTableName(table.getSchemaName().toString(), table.getTableName().toString()), + columnName, + sb.toString()); + } + public static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector, ResultIterator iterator, int[] columnIndexes, int[] pkSlotIndexes, boolean useServerTimestamp, @@ -187,6 +215,9 @@ public class UpsertCompiler { int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES); + int maxHBaseClientKeyValueSize = + services.getProps().getInt(QueryServices.HBASE_CLIENT_KEYVALUE_MAXSIZE, + QueryServicesOptions.DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE); int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); // we automatically flush the mutations when either auto commit is enabled, or // the target table is transactional (in that case changes are not visible until we commit) @@ -253,7 +284,7 @@ public class UpsertCompiler { } setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, null, - numSplColumns); + numSplColumns, maxHBaseClientKeyValueSize); rowCount++; // Commit a batch if auto commit is true and we're at our batch size if (autoFlush && rowCount % batchSize == 0) { @@ -1267,7 +1298,11 @@ public class UpsertCompiler { indexMaintainer = table.getIndexMaintainer(parentTable, connection); viewConstants = IndexUtil.getViewConstants(parentTable); } - setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, indexMaintainer, viewConstants, onDupKeyBytes, 0); + int maxHBaseClientKeyValueSize = statement.getConnection().getQueryServices().getProps(). + getInt(QueryServices.HBASE_CLIENT_KEYVALUE_MAXSIZE, + QueryServicesOptions.DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE); + setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement, useServerTimestamp, + indexMaintainer, viewConstants, onDupKeyBytes, 0, maxHBaseClientKeyValueSize); return new MutationState(tableRef, mutation, 0, maxSize, maxSizeBytes, connection); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 3d48856..f50f5cd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -35,6 +35,7 @@ import org.apache.phoenix.schema.ConcurrentTableMutationException; import org.apache.phoenix.schema.FunctionAlreadyExistsException; import org.apache.phoenix.schema.FunctionNotFoundException; import org.apache.phoenix.schema.IndexNotFoundException; +import org.apache.phoenix.schema.MaxPhoenixColumnSizeExceededException; import org.apache.phoenix.schema.MaxMutationSizeBytesExceededException; import org.apache.phoenix.schema.MaxMutationSizeExceededException; import org.apache.phoenix.schema.PTable; @@ -516,6 +517,16 @@ public enum SQLExceptionCode { NEW_INTERNAL_CONNECTION_THROTTLED(731, "410M1", "Could not create connection " + "because the internal connections already has the maximum number" + " of connections to the target cluster."), + MAX_HBASE_CLIENT_KEYVALUE_MAXSIZE_EXCEEDED(732, + "LIM03", "The Phoenix Column size is bigger than maximum " + + "HBase client key value allowed size for ONE_CELL_PER_COLUMN table, " + + "try upserting column in smaller value", new Factory() { + @Override + public SQLException newException(SQLExceptionInfo info) { + return new MaxPhoenixColumnSizeExceededException(info.getMessage(), info.getMaxPhoenixColumnSizeBytes(), + info.getPhoenixColumnSizeBytes()); + } + }), INSUFFICIENT_MEMORY(999, "50M01", "Unable to allocate enough memory."), HASH_JOIN_CACHE_NOT_FOUND(900, "HJ01", "Hash Join cache not found"), diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java index 4681ac3..4d13bff 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionInfo.java @@ -42,6 +42,8 @@ public class SQLExceptionInfo { public static final String MUTATION_SIZE = "mutationSize"; public static final String MAX_MUTATION_SIZE_BYTES = "maxMutationSizeBytes"; public static final String MUTATION_SIZE_BYTES = "mutationSizeBytes"; + public static final String MAX_PHOENIX_COLUMN_SIZE_BYTES = "maxPhoenixColumnSizeBytes"; + public static final String PHOENIX_COLUMN_SIZE_BYTES = "phoenixColumnSizeBytes"; private final Throwable rootCause; private final SQLExceptionCode code; // Should always have one. @@ -55,6 +57,8 @@ public class SQLExceptionInfo { private final int mutationSize; private final long maxMutationSizeBytes; private final long mutationSizeBytes; + private final int phoenixColumnSizeBytes; + private final int maxPhoenixColumnSizeBytes; public static class Builder { @@ -70,6 +74,8 @@ public class SQLExceptionInfo { private int mutationSize; private long maxMutationSizeBytes; private long mutationSizeBytes; + private int phoenixColumnSizeBytes; + private int maxPhoenixColumnSizeBytes; public Builder(SQLExceptionCode code) { this.code = code; @@ -130,6 +136,16 @@ public class SQLExceptionInfo { return this; } + public Builder setPhoenixColumnSizeBytes(int phoenixColumnSizeBytes) { + this.phoenixColumnSizeBytes = phoenixColumnSizeBytes; + return this; + } + + public Builder setMaxPhoenixColumnSizeBytes(int maxPhoenixColumnSizeBytes) { + this.maxPhoenixColumnSizeBytes = maxPhoenixColumnSizeBytes; + return this; + } + public SQLExceptionInfo build() { return new SQLExceptionInfo(this); } @@ -153,6 +169,8 @@ public class SQLExceptionInfo { mutationSize = builder.mutationSize; maxMutationSizeBytes = builder.maxMutationSizeBytes; mutationSizeBytes = builder.mutationSizeBytes; + maxPhoenixColumnSizeBytes = builder.maxPhoenixColumnSizeBytes; + phoenixColumnSizeBytes = builder.phoenixColumnSizeBytes; } @Override @@ -188,6 +206,10 @@ public class SQLExceptionInfo { append(maxMutationSizeBytes); builder.append(" ").append(MUTATION_SIZE_BYTES).append("=").append(mutationSizeBytes); } + if (maxPhoenixColumnSizeBytes != 0) { + builder.append(" ").append(MAX_PHOENIX_COLUMN_SIZE_BYTES).append("=").append(maxPhoenixColumnSizeBytes); + builder.append(" ").append(PHOENIX_COLUMN_SIZE_BYTES).append("=").append(phoenixColumnSizeBytes); + } return builder.toString(); } @@ -243,4 +265,11 @@ public class SQLExceptionInfo { return mutationSizeBytes; } + public int getMaxPhoenixColumnSizeBytes() { + return maxPhoenixColumnSizeBytes; + } + + public int getPhoenixColumnSizeBytes() { + return phoenixColumnSizeBytes; + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 5a601c2..f97e06d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -97,6 +97,7 @@ public interface QueryServices extends SQLCloseable { public static final String SCAN_CACHE_SIZE_ATTRIB = "hbase.client.scanner.caching"; public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize"; public static final String MAX_MUTATION_SIZE_BYTES_ATTRIB = "phoenix.mutate.maxSizeBytes"; + public static final String HBASE_CLIENT_KEYVALUE_MAXSIZE = "hbase.client.keyvalue.maxsize"; public static final String MUTATE_BATCH_SIZE_ATTRIB = "phoenix.mutate.batchSize"; public static final String MUTATE_BATCH_SIZE_BYTES_ATTRIB = "phoenix.mutate.batchSizeBytes"; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 3773174..1f354a5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -148,6 +148,7 @@ public class QueryServicesOptions { public static final boolean DEFAULT_CALL_QUEUE_ROUND_ROBIN = true; public static final int DEFAULT_MAX_MUTATION_SIZE = 500000; public static final int DEFAULT_MAX_MUTATION_SIZE_BYTES = 104857600; // 100 Mb + public static final int DEFAULT_HBASE_CLIENT_KEYVALUE_MAXSIZE = 10485760; // 10 Mb public static final boolean DEFAULT_USE_INDEXES = true; // Use indexes public static final boolean DEFAULT_IMMUTABLE_ROWS = false; // Tables rows may be updated public static final boolean DEFAULT_DROP_METADATA = true; // Drop meta data also. diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MaxPhoenixColumnSizeExceededException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MaxPhoenixColumnSizeExceededException.java new file mode 100644 index 0000000..500ac15 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MaxPhoenixColumnSizeExceededException.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema; + +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; + +import java.sql.SQLException; + +/** + * + * Exception thrown when MutationState row Column Cell size is bigger than + * maximum allowed number + * + */ + +public class MaxPhoenixColumnSizeExceededException extends SQLException { + private static final long serialVersionUID = 1L; + private static SQLExceptionCode code = SQLExceptionCode.MAX_HBASE_CLIENT_KEYVALUE_MAXSIZE_EXCEEDED; + + public MaxPhoenixColumnSizeExceededException() { + super(new SQLExceptionInfo.Builder(code).build().toString(), code.getSQLState(), code.getErrorCode(), null); + } + + public MaxPhoenixColumnSizeExceededException(String rowkeyAndColumnInfo, int maxMutationCellSizeBytes, + int mutationCellSizeBytes) { + super(new SQLExceptionInfo.Builder(code).setMaxPhoenixColumnSizeBytes(maxMutationCellSizeBytes) + .setPhoenixColumnSizeBytes(mutationCellSizeBytes).build().toString() + ". " + rowkeyAndColumnInfo, + code.getSQLState(), code.getErrorCode(), null); + } +}
