This is an automated email from the ASF dual-hosted git repository. laiyingchun pushed a commit to branch branch-1.17.x in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 3eaa205213ff5c6e885b9fc0f3cfb9098d3e2231 Author: Abhishek Chennaka <achenn...@cloudera.com> AuthorDate: Thu Jun 15 21:39:50 2023 -0700 KUDU-1945 Backup/restore for tables with auto incrementing columns This patch adds the ability to backup and restore tables containing auto-incrementing columns. A simple test case is also added to test the functionaility. Without this patch any attempt to Backup a table with auto-incrementing column will fail with an error message "auto_incrementing_id is a reserved column name" from schema.cc. Restoring a table with auto-incrementing column will result in the same behavior as with the backup. Change-Id: I929d54d22c1c938ee67fdda9f4c2bb68c028b4ec Reviewed-on: http://gerrit.cloudera.org:8080/20084 Reviewed-by: Alexey Serbin <ale...@apache.org> Tested-by: Kudu Jenkins (cherry picked from commit 1bdb0105b1ae64c167f32b67ee9da03825605d9c) Reviewed-on: http://gerrit.cloudera.org:8080/20229 Reviewed-by: Marton Greber <greber...@gmail.com> Reviewed-by: Yifan Zhang <chinazhangyi...@163.com> Tested-by: Yingchun Lai <laiyingc...@apache.org> --- .../src/main/protobuf/backup.proto | 1 + .../org/apache/kudu/backup/TableMetadata.scala | 67 +++++++++++++--------- .../org/apache/kudu/backup/TestKuduBackup.scala | 58 +++++++++++++++++++ .../org/apache/kudu/spark/kudu/KuduTestSuite.scala | 8 +++ src/kudu/common/schema.cc | 5 +- 5 files changed, 112 insertions(+), 27 deletions(-) diff --git a/java/kudu-backup-common/src/main/protobuf/backup.proto b/java/kudu-backup-common/src/main/protobuf/backup.proto index 0fa9baecd..77a13b3e3 100644 --- a/java/kudu-backup-common/src/main/protobuf/backup.proto +++ b/java/kudu-backup-common/src/main/protobuf/backup.proto @@ -49,6 +49,7 @@ message ColumnMetadataPB { string compression = 8; int32 block_size = 9; string comment = 10; + bool is_auto_incrementing = 11; } // A human readable string representation of a column value for use diff --git a/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala b/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala index 3c205be68..b52592f7d 100644 --- a/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala +++ b/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala @@ -70,6 +70,7 @@ object TableMetadata { .setCompression(col.getCompressionAlgorithm.toString) .setBlockSize(col.getDesiredBlockSize) .setComment(col.getComment) + .setIsAutoIncrementing(col.isAutoIncrementing) if (col.getTypeAttributes != null) { builder.setTypeAttributes(getTypeAttributesMetadata(col)) } @@ -236,36 +237,50 @@ object TableMetadata { } def getKuduSchema(metadata: TableMetadataPB): Schema = { - val columns = metadata.getColumnsList.asScala.map { col => - val colType = Type.getTypeForName(col.getType) - val builder = new ColumnSchemaBuilder(col.getName, colType) - .key(col.getIsKey) - .nullable(col.getIsNullable) - .encoding(Encoding.valueOf(col.getEncoding)) - .compressionAlgorithm(CompressionAlgorithm.valueOf(col.getCompression)) - .desiredBlockSize(col.getBlockSize) - .comment(col.getComment) - - if (col.hasDefaultValue) { - val value = valueFromString(col.getDefaultValue.getValue, colType) - builder.defaultValue(value) + var IsAutoIncrementingPresent = false + metadata.getColumnsList.asScala.foreach { col => + if (col.getIsAutoIncrementing) { + IsAutoIncrementingPresent = true } + } + val columns = new util.ArrayList[ColumnSchema]() + val colIds = new util.ArrayList[Integer]() + val toId = metadata.getColumnIdsMap.asScala + metadata.getColumnsList.asScala.foreach { col => + if (!col.getIsAutoIncrementing) { + val colType = Type.getTypeForName(col.getType) + val builder = new ColumnSchemaBuilder(col.getName, colType) + .nullable(col.getIsNullable) + .encoding(Encoding.valueOf(col.getEncoding)) + .compressionAlgorithm(CompressionAlgorithm.valueOf(col.getCompression)) + .desiredBlockSize(col.getBlockSize) + .comment(col.getComment) + if (IsAutoIncrementingPresent) { + builder.nonUniqueKey(col.getIsKey) + } else { + builder.key(col.getIsKey) + } - if (col.hasTypeAttributes) { - val attributes = col.getTypeAttributes - builder.typeAttributes( - new ColumnTypeAttributesBuilder() - .precision(attributes.getPrecision) - .scale(attributes.getScale) - .length(attributes.getLength) - .build() - ) + if (col.hasDefaultValue) { + val value = valueFromString(col.getDefaultValue.getValue, colType) + builder.defaultValue(value) + } + + if (col.hasTypeAttributes) { + val attributes = col.getTypeAttributes + builder.typeAttributes( + new ColumnTypeAttributesBuilder() + .precision(attributes.getPrecision) + .scale(attributes.getScale) + .length(attributes.getLength) + .build() + ) + } + colIds.add(toId(col.getName)) + columns.add(builder.build()) } - builder.build() } - val toId = metadata.getColumnIdsMap.asScala - val colIds = metadata.getColumnsList.asScala.map(_.getName).map(toId) - new Schema(columns.asJava, colIds.asJava) + new Schema(columns, colIds) } private def getValue(row: PartialRow, columnName: String, colType: Type): Any = { diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala index 0a61f989c..cc9bcd4de 100644 --- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala +++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala @@ -82,6 +82,64 @@ class TestKuduBackup extends KuduTestSuite { validateTablesMatch(tableName, s"$tableName-restore") } + @Test + def testAutoIncrementingColumnBackupAndRestore() { + val rowCount = 100 + val expectedRowCount = 200 + val simpleAutoIncrementingTableOptions = new CreateTableOptions() + .setRangePartitionColumns(List("key").asJava) + .setNumReplicas(1) + val AutoIncrementingTable = kuduClient.createTable( + simpleAutoIncrementingTableName, + simpleAutoIncrementingSchema, + simpleAutoIncrementingTableOptions) + + val session = kuduClient.newSession() + + // Insert some rows. + Range(0, rowCount).foreach { i => + val insert = AutoIncrementingTable.newInsert + val row = insert.getRow + row.addInt("key", i) + row.addString("val", s"a$i") + session.apply(insert) + } + + // Perform a full backup. + backupAndValidateTable(simpleAutoIncrementingTableName, rowCount, false) + // Insert some more rows. + Range(rowCount, 2 * rowCount).foreach { i => + val insert = AutoIncrementingTable.newInsert + val row = insert.getRow + row.addInt("key", i) + row.addString("val", s"a$i") + session.apply(insert) + } + // Perform an incremental backup. + backupAndValidateTable(simpleAutoIncrementingTableName, rowCount, true) + + // Restore the table. + restoreAndValidateTable(simpleAutoIncrementingTableName, expectedRowCount) + // Validate the table schemas match. + validateTablesMatch( + simpleAutoIncrementingTableName, + s"$simpleAutoIncrementingTableName-restore") + + // Validate the data written in the restored table. + val restoreTable = kuduClient.openTable(s"$simpleAutoIncrementingTableName-restore") + val scanner = kuduClient.newScannerBuilder(restoreTable).build() + val rows = scanner.asScala.toList + assertEquals(expectedRowCount, rows.length) + var i = 0 + rows.foreach { row => + assertEquals(i, row.getInt("key")) + assertEquals(i + 1, row.getLong(Schema.getAutoIncrementingColumnName)) + assertEquals(s"a$i", row.getString("val")) + i += 1 + } + assertEquals(expectedRowCount, rows.length) + } + @Test def testSimpleIncrementalBackupAndRestore() { insertRows(table, 100) // Insert data into the default test table. diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala index e30dbf009..6f5c4fe43 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala @@ -55,6 +55,7 @@ trait KuduTestSuite { val tableName: String = "test" val owner: String = "testuser" val simpleTableName: String = "simple-test" + val simpleAutoIncrementingTableName: String = "simple-auto-incrementing-test" lazy val schema: Schema = { val columns = List( @@ -107,6 +108,13 @@ trait KuduTestSuite { new Schema(columns) } + lazy val simpleAutoIncrementingSchema: Schema = { + val columns = List( + new ColumnSchemaBuilder("key", Type.INT32).nonUniqueKey(true).build(), + new ColumnSchemaBuilder("val", Type.STRING).nullable(true).build()).asJava + new Schema(columns) + } + val tableOptions: CreateTableOptions = { val bottom = schema.newPartialRow() // Unbounded. val middle = schema.newPartialRow() diff --git a/src/kudu/common/schema.cc b/src/kudu/common/schema.cc index 679b3548a..7a122cada 100644 --- a/src/kudu/common/schema.cc +++ b/src/kudu/common/schema.cc @@ -310,8 +310,11 @@ Status Schema::Reset(vector<ColumnSchema> cols, if (col.name().empty()) { return Status::InvalidArgument("column names must be non-empty"); } + // We have to check for the number of key columns here as + // ColumnSchema.getStrippedColumnSchema() would trigger the exception + // otherwise. if (col.name() == Schema::GetAutoIncrementingColumnName() && - !col.is_auto_incrementing()) { + !col.is_auto_incrementing() && num_key_columns_ != 0) { return Status::InvalidArgument(Substitute( "$0 is a reserved column name", Schema::GetAutoIncrementingColumnName())); }