This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push: new f64ed2aac KUDU-1945: Support non unique primary key for Java client f64ed2aac is described below commit f64ed2aac40515ae46132a9bc3cdf7ad5b3f33de Author: wzhou-code <wz...@cloudera.com> AuthorDate: Fri Jan 13 17:48:24 2023 -0800 KUDU-1945: Support non unique primary key for Java client This patch adds new APIs to create ColumnSchema with non unique primary key for Java client. When a table with non unique primary key is created, Auto-Incrementing Column "auto_incrementing_id" will be added automatically to the table as the key column. The non-unique key columns and the auto-incrementing column together form the effective primary key. UPSERT/UPSERT_IGNORE operations are not supported now for Kudu table with auto-incrementing column due to limitation in Kudu server. Auto-Incrementing column cannot be added, removed or renamed with Alter Table APIs. Testing: - Added unit-test for Java client library. - Manually ran integration test with Impala for creating table with non unique primary key, and ran queries for operations: describe/insert/update/delete/upsert/CTAS/select/alter, etc. Passed Kudu related end-to-end tests. Change-Id: I7e2501d6b3d66f6466959e4f3f1ed0f5e08dfe5c Reviewed-on: http://gerrit.cloudera.org:8080/19384 Reviewed-by: Alexey Serbin <ale...@apache.org> Reviewed-by: Abhishek Chennaka <achenn...@cloudera.com> Tested-by: Alexey Serbin <ale...@apache.org> --- .../main/java/org/apache/kudu/ColumnSchema.java | 138 ++++++++++++- .../src/main/java/org/apache/kudu/Schema.java | 91 +++++++++ .../org/apache/kudu/client/AlterTableOptions.java | 25 +++ .../org/apache/kudu/client/AsyncKuduScanner.java | 2 + .../java/org/apache/kudu/client/KuduTable.java | 10 + .../org/apache/kudu/client/ProtobufHelper.java | 66 +++++-- .../java/org/apache/kudu/TestColumnSchema.java | 33 ++++ .../org/apache/kudu/client/TestKuduClient.java | 219 +++++++++++++++++++++ .../java/org/apache/kudu/client/TestKuduTable.java | 102 ++++++++++ .../java/org/apache/kudu/test/ClientTestUtil.java | 9 + 10 files changed, 676 insertions(+), 19 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java b/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java index 62e399550..26b679deb 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java @@ -39,8 +39,10 @@ public class ColumnSchema { private final String name; private final Type type; private final boolean key; + private final boolean keyUnique; private final boolean nullable; private final boolean immutable; + private final boolean autoIncrementing; private final Object defaultValue; private final int desiredBlockSize; private final Encoding encoding; @@ -104,7 +106,8 @@ public class ColumnSchema { } } - private ColumnSchema(String name, Type type, boolean key, boolean nullable, boolean immutable, + private ColumnSchema(String name, Type type, boolean key, boolean keyUnique, + boolean nullable, boolean immutable, boolean autoIncrementing, Object defaultValue, int desiredBlockSize, Encoding encoding, CompressionAlgorithm compressionAlgorithm, ColumnTypeAttributes typeAttributes, Common.DataType wireType, @@ -112,8 +115,10 @@ public class ColumnSchema { this.name = name; this.type = type; this.key = key; + this.keyUnique = keyUnique; this.nullable = nullable; this.immutable = immutable; + this.autoIncrementing = autoIncrementing; this.defaultValue = defaultValue; this.desiredBlockSize = desiredBlockSize; this.encoding = encoding; @@ -148,6 +153,14 @@ public class ColumnSchema { return key; } + /** + * Answers if the key is unique + * @return true if the key is unique + */ + public boolean isKeyUnique() { + return keyUnique; + } + /** * Answers if the column can be set to null * @return true if it can be set to null, else false @@ -164,6 +177,14 @@ public class ColumnSchema { return immutable; } + /** + * Answers if the column is auto-incrementing column + * @return true if the column value is automatically assigned with incrementing value + */ + public boolean isAutoIncrementing() { + return autoIncrementing; + } + /** * The Java object representation of the default value that's read * @return the default read value @@ -239,6 +260,8 @@ public class ColumnSchema { return Objects.equals(name, that.name) && Objects.equals(type, that.type) && Objects.equals(key, that.key) && + Objects.equals(keyUnique, that.keyUnique) && + Objects.equals(autoIncrementing, that.autoIncrementing) && Objects.equals(typeAttributes, that.typeAttributes) && Objects.equals(comment, that.comment); } @@ -276,6 +299,7 @@ public class ColumnSchema { private final String name; private final Type type; private boolean key = false; + private boolean keyUnique = false; private boolean nullable = false; private boolean immutable = false; private Object defaultValue = null; @@ -290,8 +314,14 @@ public class ColumnSchema { * Constructor for the required parameters. * @param name column's name * @param type column's type + * @throws IllegalArgumentException if the column's name equals the reserved + * auto-incrementing column name */ public ColumnSchemaBuilder(String name, Type type) { + if (name.equalsIgnoreCase(Schema.getAutoIncrementingColumnName())) { + throw new IllegalArgumentException("Column name " + + Schema.getAutoIncrementingColumnName() + " is reserved by Kudu engine"); + } this.name = name; this.type = type; } @@ -304,6 +334,7 @@ public class ColumnSchema { this.name = that.name; this.type = that.type; this.key = that.key; + this.keyUnique = that.keyUnique; this.nullable = that.nullable; this.immutable = that.immutable; this.defaultValue = that.defaultValue; @@ -317,11 +348,25 @@ public class ColumnSchema { /** * Sets if the column is part of the row key. False by default. + * This function call overrides any previous key() and nonUniqueKey() call. * @param key a boolean that indicates if the column is part of the key * @return this instance */ public ColumnSchemaBuilder key(boolean key) { this.key = key; + this.keyUnique = key ? true : false; + return this; + } + + /** + * Sets if the column is part of the row non unique key. False by default. + * This function call overrides any previous key() and nonUniqueKey() call. + * @param key a boolean that indicates if the column is a part of the non unique key + * @return this instance + */ + public ColumnSchemaBuilder nonUniqueKey(boolean key) { + this.key = key; + this.keyUnique = false; return this; } @@ -456,10 +501,97 @@ public class ColumnSchema { } } - return new ColumnSchema(name, type, - key, nullable, immutable, defaultValue, + return new ColumnSchema(name, type, key, keyUnique, nullable, immutable, + /* autoIncrementing */false, defaultValue, desiredBlockSize, encoding, compressionAlgorithm, typeAttributes, wireType, comment); } } + + /** + * Builder for ColumnSchema of the auto-incrementing column. It's used internally in Kudu + * client library. + */ + @InterfaceAudience.Public + @InterfaceStability.Evolving + public static class AutoIncrementingColumnSchemaBuilder { + private final String name; + private final Type type; + private int desiredBlockSize = 0; + private Encoding encoding = null; + private CompressionAlgorithm compressionAlgorithm = null; + private Common.DataType wireType = null; + private String comment = ""; + + /** + * Constructor with default parameter values for {@link ColumnSchema}. + */ + public AutoIncrementingColumnSchemaBuilder() { + this.name = Schema.getAutoIncrementingColumnName(); + this.type = Schema.getAutoIncrementingColumnType(); + } + + /** + * Set the desired block size for this column. + */ + public AutoIncrementingColumnSchemaBuilder desiredBlockSize(int desiredBlockSize) { + this.desiredBlockSize = desiredBlockSize; + return this; + } + + /** + * Set the block encoding for this column. This function should be called when + * fetching column schema from Kudu server. + */ + public AutoIncrementingColumnSchemaBuilder encoding(Encoding encoding) { + this.encoding = encoding; + return this; + } + + /** + * Set the compression algorithm for this column. This function should be called + * when fetching column schema from Kudu server. + */ + public AutoIncrementingColumnSchemaBuilder compressionAlgorithm( + CompressionAlgorithm compressionAlgorithm) { + this.compressionAlgorithm = compressionAlgorithm; + return this; + } + + /** + * Allows an alternate {@link Common.DataType} to override the {@link Type} + * when serializing the ColumnSchema on the wire. + * This is useful for virtual columns specified by their type such as + * {@link Common.DataType#IS_DELETED}. + */ + @InterfaceAudience.Private + public AutoIncrementingColumnSchemaBuilder wireType(Common.DataType wireType) { + this.wireType = wireType; + return this; + } + + /** + * Set the comment for this column. + */ + public AutoIncrementingColumnSchemaBuilder comment(String comment) { + this.comment = comment; + return this; + } + + /** + * Builds a {@link ColumnSchema} for auto-incrementing column with passed parameters. + * @return a new {@link ColumnSchema} + */ + public ColumnSchema build() { + // Set the wire type if it wasn't explicitly set. + if (wireType == null) { + this.wireType = type.getDataType(null); + } + return new ColumnSchema(name, type, /* key */true, /* keyUnique */false, + /* nullable */false, /* immutable */false, + /* autoIncrementing */true, /* defaultValue */null, + desiredBlockSize, encoding, compressionAlgorithm, + /* typeAttributes */null, wireType, comment); + } + } } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/Schema.java b/java/kudu-client/src/main/java/org/apache/kudu/Schema.java index f07d0c616..5ff514bf1 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/Schema.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/Schema.java @@ -39,6 +39,13 @@ import org.apache.kudu.client.PartialRow; @InterfaceStability.Evolving public class Schema { + /* + * Column name and type of auto_incrementing_id column, which is added by Kudu engine + * automatically if the primary key is not unique. + */ + private static final String AUTO_INCREMENTING_ID_COL_NAME = "auto_incrementing_id"; + private static final Type AUTO_INCREMENTING_ID_COL_TYPE = Type.INT64; + /** * Mapping of column index to column. */ @@ -71,8 +78,10 @@ public class Schema { private final int varLengthColumnCount; private final int rowSize; + private final boolean isKeyUnique; private final boolean hasNullableColumns; private final boolean hasImmutableColumns; + private final boolean hasAutoIncrementingColumn; private final int isDeletedIndex; private static final int NO_IS_DELETED_INDEX = -1; @@ -106,6 +115,54 @@ public class Schema { "Schema must be constructed with all column IDs, or none."); } + boolean isKeyFound = false; + boolean isKeyUnique = false; + boolean hasAutoIncrementing = false; + int keyColumnCount = 0; + int maxColumnId = Integer.MIN_VALUE; + // Check if auto-incrementing column should be added into the input columns list + for (int index = 0; index < columns.size(); index++) { + final ColumnSchema column = columns.get(index); + if (column.isKey()) { + keyColumnCount++; + if (!isKeyFound) { + isKeyFound = true; + isKeyUnique = column.isKeyUnique(); + } else if (isKeyUnique != column.isKeyUnique()) { + throw new IllegalArgumentException( + "Mixture of unique key and non unique key in a table"); + } + } + if (column.isAutoIncrementing()) { + if (!hasAutoIncrementing) { + hasAutoIncrementing = true; + } else { + throw new IllegalArgumentException( + "More than one columns are set as auto-incrementing columns"); + } + } + if (hasColumnIds && maxColumnId < columnIds.get(index).intValue()) { + maxColumnId = columnIds.get(index).intValue(); + } + } + // Add auto-incrementing column into input columns list if the primary key is not + // unique and auto-incrementing column has not been created. + if (keyColumnCount > 0 && !isKeyUnique && !hasAutoIncrementing) { + // Build auto-incrementing column + ColumnSchema autoIncrementingColumn = + new ColumnSchema.AutoIncrementingColumnSchemaBuilder().build(); + // Make a copy of mutable list of columns, then add an auto-incrementing + // column after the columns marked as key columns. + columns = new ArrayList<>(columns); + Preconditions.checkNotNull(columns); + columns.add(keyColumnCount, autoIncrementingColumn); + if (hasColumnIds) { + columnIds = new ArrayList<>(columnIds); + columnIds.add(keyColumnCount, maxColumnId + 1); + } + hasAutoIncrementing = true; + } + this.columnsByIndex = ImmutableList.copyOf(columns); int varLenCnt = 0; this.columnOffsets = new int[columns.size()]; @@ -154,8 +211,10 @@ public class Schema { this.varLengthColumnCount = varLenCnt; this.rowSize = getRowSize(this.columnsByIndex); + this.isKeyUnique = isKeyUnique; this.hasNullableColumns = hasNulls; this.hasImmutableColumns = hasImmutables; + this.hasAutoIncrementingColumn = hasAutoIncrementing; this.isDeletedIndex = isDeletedIndex; } @@ -294,6 +353,38 @@ public class Schema { return primaryKeyColumns; } + /** + * Answers if the primary key is unique for the table + * @return true if the key is unique + */ + public boolean isPrimaryKeyUnique() { + return this.isKeyUnique; + } + + /** + * Tells if there's auto-incrementing column + * @return true if there's auto-incrementing column, else false. + */ + public boolean hasAutoIncrementingColumn() { + return this.hasAutoIncrementingColumn; + } + + /** + * Get the name of the auto-incrementing column + * @return column name of the auto-incrementing column. + */ + public static String getAutoIncrementingColumnName() { + return AUTO_INCREMENTING_ID_COL_NAME; + } + + /** + * Get the type of the auto-incrementing column + * @return type of the auto-incrementing column. + */ + public static Type getAutoIncrementingColumnType() { + return AUTO_INCREMENTING_ID_COL_TYPE; + } + /** * Get a schema that only contains the columns which are part of the key * @return new schema with only the keys diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java index 026c3f6ce..f3d7e4637 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java @@ -34,6 +34,7 @@ import org.apache.yetus.audience.InterfaceStability; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Common; +import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags; import org.apache.kudu.master.Master; @@ -84,6 +85,10 @@ public class AlterTableOptions { * @return this instance */ public AlterTableOptions addColumn(ColumnSchema colSchema) { + if (colSchema.getName().equalsIgnoreCase(Schema.getAutoIncrementingColumnName())) { + throw new IllegalArgumentException("Column name " + + Schema.getAutoIncrementingColumnName() + " is reserved by Kudu engine"); + } if (!colSchema.isNullable() && colSchema.getDefaultValue() == null) { throw new IllegalArgumentException("A new non-null column must have a default value"); } @@ -140,6 +145,10 @@ public class AlterTableOptions { * @return this instance */ public AlterTableOptions dropColumn(String name) { + if (name.equalsIgnoreCase(Schema.getAutoIncrementingColumnName())) { + throw new IllegalArgumentException("Cannot remove auto-incrementing column " + + Schema.getAutoIncrementingColumnName()); + } AlterTableRequestPB.Step.Builder step = pb.addAlterSchemaStepsBuilder(); step.setType(AlterTableRequestPB.StepType.DROP_COLUMN); step.setDropColumn(AlterTableRequestPB.DropColumn.newBuilder().setName(name)); @@ -153,6 +162,10 @@ public class AlterTableOptions { * @return this instance */ public AlterTableOptions renameColumn(String oldName, String newName) { + if (oldName.equalsIgnoreCase(Schema.getAutoIncrementingColumnName())) { + throw new IllegalArgumentException("Cannot rename auto-incrementing column " + + Schema.getAutoIncrementingColumnName()); + } // For backwards compatibility, this uses the RENAME_COLUMN step type. AlterTableRequestPB.Step.Builder step = pb.addAlterSchemaStepsBuilder(); step.setType(AlterTableRequestPB.StepType.RENAME_COLUMN); @@ -167,6 +180,10 @@ public class AlterTableOptions { * @return this instance */ public AlterTableOptions removeDefault(String name) { + if (name.equalsIgnoreCase(Schema.getAutoIncrementingColumnName())) { + throw new IllegalArgumentException("Auto-incrementing column " + + Schema.getAutoIncrementingColumnName() + " does not have default value"); + } AlterTableRequestPB.Step.Builder step = pb.addAlterSchemaStepsBuilder(); step.setType(AlterTableRequestPB.StepType.ALTER_COLUMN); AlterTableRequestPB.AlterColumn.Builder alterBuilder = @@ -185,6 +202,10 @@ public class AlterTableOptions { * @return this instance */ public AlterTableOptions changeDefault(String name, Object newDefault) { + if (name.equalsIgnoreCase(Schema.getAutoIncrementingColumnName())) { + throw new IllegalArgumentException("Cannot set default value for " + + "auto-incrementing column " + Schema.getAutoIncrementingColumnName()); + } if (newDefault == null) { throw new IllegalArgumentException("newDefault cannot be null: " + "use removeDefault to clear a default value"); @@ -486,6 +507,10 @@ public class AlterTableOptions { * @return this instance */ public AlterTableOptions changeImmutable(String name, boolean immutable) { + if (name.equalsIgnoreCase(Schema.getAutoIncrementingColumnName())) { + throw new IllegalArgumentException("Cannot change immutable for " + + "auto-incrementing column " + Schema.getAutoIncrementingColumnName()); + } AlterTableRequestPB.Step.Builder step = pb.addAlterSchemaStepsBuilder(); step.setType(AlterTableRequestPB.StepType.ALTER_COLUMN); AlterTableRequestPB.AlterColumn.Builder alterBuilder = diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java index 49c188901..4929dd4d8 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java @@ -370,6 +370,8 @@ public final class AsyncKuduScanner { columns.add(getStrippedColumnSchema(originalColumn)); } } else { + // By default, a scanner is created with all columns including auto-incrementing + // column if projected columns are not specified. columns.addAll(table.getSchema().getColumns()); } // This is a diff scan so add the IS_DELETED column. diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java index 1346231f3..97672ecc1 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduTable.java @@ -185,8 +185,13 @@ public class KuduTable { /** * Get a new upsert configured with this table's schema. The returned object should not be reused. * @return an upsert with this table's schema + * @throws UnsupportedOperationException if the table has auto-incrementing column */ public Upsert newUpsert() { + if (schema.hasAutoIncrementingColumn()) { + throw new UnsupportedOperationException( + "Tables with auto-incrementing column do not support UPSERT operations"); + } return new Upsert(this); } @@ -195,8 +200,13 @@ public class KuduTable { * updating immutable cells in a row. This is useful when upserting rows in a table with immutable * columns. * @return an upsert with this table's schema + * @throws UnsupportedOperationException if the table has auto-incrementing column */ public UpsertIgnore newUpsertIgnore() { + if (schema.hasAutoIncrementingColumn()) { + throw new UnsupportedOperationException( + "Tables with auto-incrementing column do not support UPSERT_IGNORE operations"); + } return new UpsertIgnore(this); } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java index b7e0f77c6..a9c015bb0 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java @@ -108,6 +108,7 @@ public class ProtobufHelper { .setIsKey(column.isKey()) .setIsNullable(column.isNullable()) .setImmutable(column.isImmutable()) + .setIsAutoIncrementing(column.isAutoIncrementing()) .setCfileBlockSize(column.getDesiredBlockSize()); if (!flags.contains(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID) && colId >= 0) { @@ -150,26 +151,47 @@ public class ProtobufHelper { } public static ColumnSchema pbToColumnSchema(Common.ColumnSchemaPB pb) { + return pbToColumnSchema(pb, true); + } + + public static ColumnSchema pbToColumnSchema(Common.ColumnSchemaPB pb, + boolean isKeyUnique) { + ColumnSchema.Encoding encoding = ColumnSchema.Encoding.valueOf(pb.getEncoding().name()); + ColumnSchema.CompressionAlgorithm compressionAlgorithm = + ColumnSchema.CompressionAlgorithm.valueOf(pb.getCompression().name()); + int desiredBlockSize = pb.getCfileBlockSize(); + + if (pb.getIsAutoIncrementing()) { + // Set encoding, compression algorithm, block size and comment from 'pb' parameter + return new ColumnSchema.AutoIncrementingColumnSchemaBuilder() + .encoding(encoding) + .compressionAlgorithm(compressionAlgorithm) + .desiredBlockSize(desiredBlockSize) + .comment(pb.getComment()) + .build(); + } + Type type = Type.getTypeForDataType(pb.getType()); ColumnTypeAttributes typeAttributes = pb.hasTypeAttributes() ? pbToColumnTypeAttributes(pb.getTypeAttributes()) : null; Object defaultValue = pb.hasWriteDefaultValue() ? byteStringToObject(type, typeAttributes, pb.getWriteDefaultValue()) : null; - ColumnSchema.Encoding encoding = ColumnSchema.Encoding.valueOf(pb.getEncoding().name()); - ColumnSchema.CompressionAlgorithm compressionAlgorithm = - ColumnSchema.CompressionAlgorithm.valueOf(pb.getCompression().name()); - int desiredBlockSize = pb.getCfileBlockSize(); - return new ColumnSchema.ColumnSchemaBuilder(pb.getName(), type) - .key(pb.getIsKey()) - .nullable(pb.getIsNullable()) - .immutable(pb.getImmutable()) - .defaultValue(defaultValue) - .encoding(encoding) - .compressionAlgorithm(compressionAlgorithm) - .desiredBlockSize(desiredBlockSize) - .typeAttributes(typeAttributes) - .comment(pb.getComment()) - .build(); + ColumnSchema.ColumnSchemaBuilder csb = + new ColumnSchema.ColumnSchemaBuilder(pb.getName(), type); + if (pb.getIsKey() && isKeyUnique) { + csb.key(true); + } else { + csb.nonUniqueKey(pb.getIsKey()); + } + return csb.nullable(pb.getIsNullable()) + .immutable(pb.getImmutable()) + .defaultValue(defaultValue) + .encoding(encoding) + .compressionAlgorithm(compressionAlgorithm) + .desiredBlockSize(desiredBlockSize) + .typeAttributes(typeAttributes) + .comment(pb.getComment()) + .build(); } public static ColumnTypeAttributes pbToColumnTypeAttributes(Common.ColumnTypeAttributesPB pb) { @@ -188,10 +210,22 @@ public class ProtobufHelper { } public static Schema pbToSchema(Common.SchemaPB schema) { + // Since ColumnSchema.keyUnique in run-time structures is not persistent in Kudu + // server, we need to find if the table has auto-incrementing column first, and set + // all key columns as non unique key columns if the table has auto-incrementing + // column. + boolean hasAutoIncrementing = false; + for (Common.ColumnSchemaPB columnPb : schema.getColumnsList()) { + if (columnPb.getIsAutoIncrementing()) { + hasAutoIncrementing = true; + break; + } + } List<ColumnSchema> columns = new ArrayList<>(schema.getColumnsCount()); List<Integer> columnIds = new ArrayList<>(schema.getColumnsCount()); for (Common.ColumnSchemaPB columnPb : schema.getColumnsList()) { - columns.add(pbToColumnSchema(columnPb)); + // Key is not unique if hasAutoIncrementing is true. + columns.add(pbToColumnSchema(columnPb, !hasAutoIncrementing)); int id = columnPb.getId(); if (id < 0) { throw new IllegalArgumentException("Illegal column ID: " + id); diff --git a/java/kudu-client/src/test/java/org/apache/kudu/TestColumnSchema.java b/java/kudu-client/src/test/java/org/apache/kudu/TestColumnSchema.java index a3385a04e..c1fd3c9e5 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/TestColumnSchema.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/TestColumnSchema.java @@ -25,6 +25,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.function.ThrowingRunnable; +import org.apache.kudu.ColumnSchema.AutoIncrementingColumnSchemaBuilder; import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder; import org.apache.kudu.test.junit.RetryRule; import org.apache.kudu.util.CharUtil; @@ -69,8 +70,17 @@ public class TestColumnSchema { ColumnSchema isKey = new ColumnSchemaBuilder("col1", Type.STRING) .key(true) .build(); + Assert.assertTrue(isKey.isKey()); assertNotEquals(stringCol1, isKey); + // Difference between key and nonUniqueKey + ColumnSchema isNonUniqueKey = new ColumnSchemaBuilder("col1", Type.STRING) + .nonUniqueKey(true) + .build(); + Assert.assertTrue(isNonUniqueKey.isKey()); + Assert.assertFalse(isNonUniqueKey.isKeyUnique()); + assertNotEquals(isKey, isNonUniqueKey); + // Different by type ColumnSchema isInt = new ColumnSchemaBuilder("col1", Type.INT32) .build(); @@ -126,4 +136,27 @@ public class TestColumnSchema { .contains("VARCHAR's length must be set and between 1 and 65535")); } + @Test + public void testAutoIncrementing() throws Exception { + // Create auto-incrementing column with AutoIncrementingColumnSchemaBuilder + ColumnSchema autoIncrementing = new AutoIncrementingColumnSchemaBuilder().build(); + Assert.assertTrue(autoIncrementing.isAutoIncrementing()); + assertEquals(Schema.getAutoIncrementingColumnType(), autoIncrementing.getType()); + Assert.assertTrue(autoIncrementing.isKey()); + Assert.assertFalse(autoIncrementing.isKeyUnique()); + Assert.assertFalse(autoIncrementing.isNullable()); + Assert.assertFalse(autoIncrementing.isImmutable()); + assertEquals(null, autoIncrementing.getDefaultValue()); + + // Create column with auto-incrementing column name with ColumnSchemaBuilder + Throwable thrown = Assert.assertThrows(IllegalArgumentException.class, new ThrowingRunnable() { + @Override + public void run() throws Exception { + new ColumnSchemaBuilder(Schema.getAutoIncrementingColumnName(), + Schema.getAutoIncrementingColumnType()).build(); + } + }); + Assert.assertTrue(thrown.getMessage().contains("Column name " + + Schema.getAutoIncrementingColumnName() + " is reserved by Kudu engine")); + } } diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java index 01965daff..917d5aed2 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java @@ -29,6 +29,7 @@ import static org.apache.kudu.test.ClientTestUtil.createManyVarcharsSchema; import static org.apache.kudu.test.ClientTestUtil.createSchemaWithBinaryColumns; import static org.apache.kudu.test.ClientTestUtil.createSchemaWithDateColumns; import static org.apache.kudu.test.ClientTestUtil.createSchemaWithDecimalColumns; +import static org.apache.kudu.test.ClientTestUtil.createSchemaWithNonUniqueKey; import static org.apache.kudu.test.ClientTestUtil.createSchemaWithTimestampColumns; import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions; import static org.apache.kudu.test.ClientTestUtil.getBasicTableOptionsWithNonCoveredRange; @@ -64,6 +65,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.stumbleupon.async.Deferred; import org.junit.Before; import org.junit.Rule; @@ -782,6 +784,223 @@ public class TestKuduClient { } } + /** + * Test creating a table with non unique primary key in the table schema. + */ + @Test(timeout = 100000) + public void testCreateTableWithNonUniquePrimaryKeys() throws Exception { + // Create a schema with non unique primary key column + Schema schema = createSchemaWithNonUniqueKey(); + assertFalse(schema.isPrimaryKeyUnique()); + // Verify auto-incrementing column is in the schema + assertTrue(schema.hasAutoIncrementingColumn()); + assertEquals(3, schema.getColumnCount()); + assertEquals(2, schema.getPrimaryKeyColumnCount()); + assertEquals(1, schema.getColumnIndex(Schema.getAutoIncrementingColumnName())); + // Create a table + client.createTable(TABLE_NAME, schema, getBasicCreateTableOptions()); + + KuduSession session = client.newSession(); + KuduTable table = client.openTable(TABLE_NAME); + + // Verify that the primary key is not unique, and an auto-incrementing column is + // added as key column in the position after all key columns. + schema = table.getSchema(); + assertFalse(schema.isPrimaryKeyUnique()); + assertTrue(schema.hasAutoIncrementingColumn()); + assertEquals(3, schema.getColumnCount()); + assertEquals(2, schema.getPrimaryKeyColumnCount()); + assertEquals(1, schema.getColumnIndex(Schema.getAutoIncrementingColumnName())); + assertTrue(schema.getColumn(Schema.getAutoIncrementingColumnName()).isKey()); + assertTrue(schema.getColumn( + Schema.getAutoIncrementingColumnName()).isAutoIncrementing()); + + // Insert rows into the table without assigning values for the auto-incrementing + // column. + for (int i = 0; i < 3; i++) { + Insert insert = table.newInsert(); + PartialRow row = insert.getRow(); + row.addInt("key", i); + row.addInt("c1", i * 10); + session.apply(insert); + } + session.flush(); + + // Scan all the rows in the table with all columns. + // Verify that the auto-incrementing column is included in the rows. + List<String> rowStrings = scanTableToStrings(table); + assertEquals(3, rowStrings.size()); + for (int i = 0; i < rowStrings.size(); i++) { + StringBuilder expectedRow = new StringBuilder(); + expectedRow.append(String.format("INT32 key=%d, INT64 %s=%d, INT32 c1=%d", + i, Schema.getAutoIncrementingColumnName(), i + 1, i * 10)); + assertEquals(expectedRow.toString(), rowStrings.get(i)); + } + + // Update "c1" column of the first row with "key" and auto-incrementing columns. + Update update = table.newUpdate(); + PartialRow row = update.getRow(); + row.addInt(schema.getColumnByIndex(0).getName(), 0); + row.addLong(schema.getColumnByIndex(1).getName(), 1); + row.addInt(schema.getColumnByIndex(2).getName(), 100); + session.apply(update); + session.flush(); + + // Scan all the rows in the table without the auto-incrementing column. + // Verify that "c1" column of the first row is updated. + KuduScanner.KuduScannerBuilder scanBuilder = client.newScannerBuilder(table); + KuduScanner scanner = + scanBuilder.setProjectedColumnNames(Lists.newArrayList("key", "c1")).build(); + rowStrings.clear(); + for (RowResult r : scanner) { + rowStrings.add(r.rowToString()); + } + Collections.sort(rowStrings); + assertEquals(3, rowStrings.size()); + for (int i = 0; i < rowStrings.size(); i++) { + StringBuilder expectedRow = new StringBuilder(); + if (i == 0) { + expectedRow.append(String.format("INT32 key=0, INT32 c1=100")); + } else { + expectedRow.append(String.format("INT32 key=%d, INT32 c1=%d", i, i * 10)); + } + assertEquals(expectedRow.toString(), rowStrings.get(i)); + } + + // Delete the first row with "key" and auto-incrementing columns. + // Verify that number of rows is decreased by 1. + Delete delete = table.newDelete(); + row = delete.getRow(); + row.addInt(schema.getColumnByIndex(0).getName(), 0); + row.addLong(schema.getColumnByIndex(1).getName(), 1); + session.apply(delete); + session.flush(); + assertEquals(2, countRowsInScan(client.newScannerBuilder(table).build())); + + // Check that we can delete the table. + client.deleteTable(TABLE_NAME); + } + + /** + * Test operations for table with auto-incrementing column. + */ + @Test(timeout = 100000) + public void testTableWithAutoIncrementingColumn() throws Exception { + // Create a schema with non unique primary key column + Schema schema = createSchemaWithNonUniqueKey(); + assertFalse(schema.isPrimaryKeyUnique()); + // Verify auto-incrementing column is in the schema + assertTrue(schema.hasAutoIncrementingColumn()); + assertEquals(3, schema.getColumnCount()); + assertEquals(2, schema.getPrimaryKeyColumnCount()); + // Create a table + client.createTable(TABLE_NAME, schema, getBasicCreateTableOptions()); + + final KuduSession session = client.newSession(); + KuduTable table = client.openTable(TABLE_NAME); + schema = table.getSchema(); + assertTrue(schema.hasAutoIncrementingColumn()); + + // Verify that UPSERT is not allowed for table with auto-incrementing column + try { + table.newUpsert(); + fail("UPSERT on table with auto-incrementing column"); + } catch (UnsupportedOperationException e) { + assertTrue(e.getMessage().contains( + "Tables with auto-incrementing column do not support UPSERT operations")); + } + + // Verify that UPSERT_IGNORE is not allowed for table with auto-incrementing column + try { + table.newUpsertIgnore(); + fail("UPSERT_IGNORE on table with auto-incrementing column"); + } catch (UnsupportedOperationException e) { + assertTrue(e.getMessage().contains( + "Tables with auto-incrementing column do not support UPSERT_IGNORE operations")); + } + + // Change desired block size for auto-incrementing column + client.alterTable(TABLE_NAME, new AlterTableOptions().changeDesiredBlockSize( + Schema.getAutoIncrementingColumnName(), 1)); + // Change encoding for auto-incrementing column + client.alterTable(TABLE_NAME, new AlterTableOptions().changeEncoding( + Schema.getAutoIncrementingColumnName(), ColumnSchema.Encoding.PLAIN_ENCODING)); + // Change compression algorithm for auto-incrementing column + client.alterTable(TABLE_NAME, new AlterTableOptions().changeCompressionAlgorithm( + Schema.getAutoIncrementingColumnName(), ColumnSchema.CompressionAlgorithm.NO_COMPRESSION)); + session.flush(); + + // Verify that auto-incrementing column cannot be added + try { + client.alterTable(TABLE_NAME, new AlterTableOptions().addColumn( + Schema.getAutoIncrementingColumnName(), Schema.getAutoIncrementingColumnType(), 0)); + fail("Add auto-incrementing column"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Column name " + + Schema.getAutoIncrementingColumnName() + " is reserved by Kudu engine")); + } + try { + client.alterTable(TABLE_NAME, new AlterTableOptions().addColumn( + new ColumnSchema.AutoIncrementingColumnSchemaBuilder().build())); + fail("Add auto-incrementing column"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Column name " + + Schema.getAutoIncrementingColumnName() + " is reserved by Kudu engine")); + } + + // Verify that auto-incrementing column cannot be removed + try { + client.alterTable(TABLE_NAME, new AlterTableOptions().dropColumn( + Schema.getAutoIncrementingColumnName())); + fail("Drop auto-incrementing column"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Cannot remove auto-incrementing column " + + Schema.getAutoIncrementingColumnName())); + } + + // Verify that auto-incrementing column cannot be renamed + try { + client.alterTable(TABLE_NAME, new AlterTableOptions().renameColumn( + Schema.getAutoIncrementingColumnName(), "new_auto_incrementing")); + fail("Rename auto-incrementing column"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Cannot rename auto-incrementing column " + + Schema.getAutoIncrementingColumnName())); + } + + // Verify that auto-incrementing column cannot be changed by removing default + try { + client.alterTable(TABLE_NAME, new AlterTableOptions().removeDefault( + Schema.getAutoIncrementingColumnName())); + fail("Remove default value for auto-incrementing column"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Auto-incrementing column " + + Schema.getAutoIncrementingColumnName() + " does not have default value")); + } + + // Verify that auto-incrementing column cannot be changed with default value + try { + client.alterTable(TABLE_NAME, new AlterTableOptions().changeDefault( + Schema.getAutoIncrementingColumnName(), 0)); + fail("Change default value for auto-incrementing column"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Cannot set default value for " + + "auto-incrementing column " + Schema.getAutoIncrementingColumnName())); + } + + // Verify that auto-incrementing column cannot be changed for its immutable + try { + client.alterTable(TABLE_NAME, new AlterTableOptions().changeImmutable( + Schema.getAutoIncrementingColumnName(), true)); + fail("Change immutable for auto-incrementing column"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Cannot change immutable for " + + "auto-incrementing column " + Schema.getAutoIncrementingColumnName())); + } + + client.deleteTable(TABLE_NAME); + } + /** * Test inserting and retrieving rows from a table that has a range partition * with custom hash schema. diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java index 72fb70c90..6f5469241 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduTable.java @@ -2560,4 +2560,106 @@ public class TestKuduTable { ".* server sent error unsupported feature flags")); } } + + /** + * Test creating table schemas with non unique primary key columns and + * auto-incrementing columns. + */ + @Test(timeout = 100000) + public void testCreateSchemaWithNonUniquePrimaryKeys() throws Exception { + // Create a schema with two non unique primary key columns and + // verify the resulting table's schema. + ArrayList<ColumnSchema> columns = new ArrayList<>(); + columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32) + .nonUniqueKey(true).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("key2", Type.INT64) + .nonUniqueKey(true).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.INT32) + .nullable(true).build()); + Schema schema = new Schema(columns); + assertFalse(schema.isPrimaryKeyUnique()); + assertTrue(schema.hasAutoIncrementingColumn()); + assertEquals(4, schema.getColumnCount()); + assertEquals(3, schema.getPrimaryKeyColumnCount()); + client.createTable(tableName, schema, getBasicCreateTableOptions()); + KuduTable table = client.openTable(tableName); + schema = table.getSchema(); + assertFalse(schema.isPrimaryKeyUnique()); + assertTrue(schema.hasAutoIncrementingColumn()); + assertEquals(4, schema.getColumnCount()); + assertEquals(3, schema.getPrimaryKeyColumnCount()); + client.deleteTable(tableName); + + // Create a schema with non unique primary key column and unique primary key column + columns.clear(); + columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32) + .nonUniqueKey(true).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("key2", Type.INT32) + .key(true).build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.INT32) + .nullable(true).build()); + try { + new Schema(columns); + fail("Schema with mixture of unique key and non unique key"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains( + "Mixture of unique key and non unique key in a table")); + } + + // Create a schema with an auto-incrementing column which is marked as non unique + // primary key and verify the resulting table's schema. + columns.clear(); + columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32) + .nonUniqueKey(true).build()); + columns.add(new ColumnSchema.AutoIncrementingColumnSchemaBuilder().build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.INT32) + .nullable(true).build()); + schema = new Schema(columns); + assertTrue(schema.hasAutoIncrementingColumn()); + assertFalse(schema.isPrimaryKeyUnique()); + assertEquals(3, schema.getColumnCount()); + assertEquals(2, schema.getPrimaryKeyColumnCount()); + client.createTable(tableName, schema, getBasicCreateTableOptions()); + table = client.openTable(tableName); + schema = table.getSchema(); + assertTrue(schema.hasAutoIncrementingColumn()); + assertFalse(schema.isPrimaryKeyUnique()); + assertEquals(3, schema.getColumnCount()); + assertEquals(2, schema.getPrimaryKeyColumnCount()); + client.deleteTable(tableName); + + // Create a schema with a single auto-incrementing column which is marked as non + // unique primary key, and verify the resulting table's schema. + columns.clear(); + columns.add(new ColumnSchema.AutoIncrementingColumnSchemaBuilder().build()); + schema = new Schema(columns); + assertTrue(schema.hasAutoIncrementingColumn()); + assertFalse(schema.isPrimaryKeyUnique()); + assertEquals(1, schema.getColumnCount()); + assertEquals(1, schema.getPrimaryKeyColumnCount()); + CreateTableOptions builder = new CreateTableOptions(); + builder.setRangePartitionColumns(ImmutableList.of(Schema.getAutoIncrementingColumnName())); + client.createTable(tableName, schema, builder); + table = client.openTable(tableName); + schema = table.getSchema(); + assertTrue(schema.hasAutoIncrementingColumn()); + assertFalse(schema.isPrimaryKeyUnique()); + assertEquals(1, schema.getColumnCount()); + assertEquals(1, schema.getPrimaryKeyColumnCount()); + client.deleteTable(tableName); + + // Create a schema with two auto-incrementing columns + columns.clear(); + columns.add(new ColumnSchema.AutoIncrementingColumnSchemaBuilder().build()); + columns.add(new ColumnSchema.AutoIncrementingColumnSchemaBuilder().build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.INT32) + .nullable(true).build()); + try { + new Schema(columns); + fail("Schema with two auto-incrementing columns"); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains( + "More than one columns are set as auto-incrementing columns")); + } + } } diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java index 8cb7d28b8..72b7a2690 100644 --- a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java +++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java @@ -497,4 +497,13 @@ public abstract class ClientTestUtil { .nullable(true).immutable(true).build()); return new Schema(columns); } + + public static Schema createSchemaWithNonUniqueKey() { + ArrayList<ColumnSchema> columns = new ArrayList<>(); + columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).nonUniqueKey(true) + .build()); + columns.add(new ColumnSchema.ColumnSchemaBuilder("c1", Type.INT32).nullable(true) + .build()); + return new Schema(columns); + } }