This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch branch-1.17.x
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 3eaa205213ff5c6e885b9fc0f3cfb9098d3e2231
Author: Abhishek Chennaka <achenn...@cloudera.com>
AuthorDate: Thu Jun 15 21:39:50 2023 -0700

    KUDU-1945 Backup/restore for tables with auto incrementing columns
    
    This patch adds the ability to backup and restore tables containing
    auto-incrementing columns. A simple test case is also added to test
    the functionaility.
    
    Without this patch any attempt to
    Backup a table with auto-incrementing column will fail with an error
    message "auto_incrementing_id is a reserved column name" from
    schema.cc.
    Restoring a table with auto-incrementing column will result in the
    same behavior as with the backup.
    
    Change-Id: I929d54d22c1c938ee67fdda9f4c2bb68c028b4ec
    Reviewed-on: http://gerrit.cloudera.org:8080/20084
    Reviewed-by: Alexey Serbin <ale...@apache.org>
    Tested-by: Kudu Jenkins
    (cherry picked from commit 1bdb0105b1ae64c167f32b67ee9da03825605d9c)
    Reviewed-on: http://gerrit.cloudera.org:8080/20229
    Reviewed-by: Marton Greber <greber...@gmail.com>
    Reviewed-by: Yifan Zhang <chinazhangyi...@163.com>
    Tested-by: Yingchun Lai <laiyingc...@apache.org>
---
 .../src/main/protobuf/backup.proto                 |  1 +
 .../org/apache/kudu/backup/TableMetadata.scala     | 67 +++++++++++++---------
 .../org/apache/kudu/backup/TestKuduBackup.scala    | 58 +++++++++++++++++++
 .../org/apache/kudu/spark/kudu/KuduTestSuite.scala |  8 +++
 src/kudu/common/schema.cc                          |  5 +-
 5 files changed, 112 insertions(+), 27 deletions(-)

diff --git a/java/kudu-backup-common/src/main/protobuf/backup.proto 
b/java/kudu-backup-common/src/main/protobuf/backup.proto
index 0fa9baecd..77a13b3e3 100644
--- a/java/kudu-backup-common/src/main/protobuf/backup.proto
+++ b/java/kudu-backup-common/src/main/protobuf/backup.proto
@@ -49,6 +49,7 @@ message ColumnMetadataPB {
   string compression = 8;
   int32 block_size = 9;
   string comment = 10;
+  bool is_auto_incrementing = 11;
 }
 
 // A human readable string representation of a column value for use
diff --git 
a/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
 
b/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
index 3c205be68..b52592f7d 100644
--- 
a/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
+++ 
b/java/kudu-backup-common/src/main/scala/org/apache/kudu/backup/TableMetadata.scala
@@ -70,6 +70,7 @@ object TableMetadata {
         .setCompression(col.getCompressionAlgorithm.toString)
         .setBlockSize(col.getDesiredBlockSize)
         .setComment(col.getComment)
+        .setIsAutoIncrementing(col.isAutoIncrementing)
       if (col.getTypeAttributes != null) {
         builder.setTypeAttributes(getTypeAttributesMetadata(col))
       }
@@ -236,36 +237,50 @@ object TableMetadata {
   }
 
   def getKuduSchema(metadata: TableMetadataPB): Schema = {
-    val columns = metadata.getColumnsList.asScala.map { col =>
-      val colType = Type.getTypeForName(col.getType)
-      val builder = new ColumnSchemaBuilder(col.getName, colType)
-        .key(col.getIsKey)
-        .nullable(col.getIsNullable)
-        .encoding(Encoding.valueOf(col.getEncoding))
-        .compressionAlgorithm(CompressionAlgorithm.valueOf(col.getCompression))
-        .desiredBlockSize(col.getBlockSize)
-        .comment(col.getComment)
-
-      if (col.hasDefaultValue) {
-        val value = valueFromString(col.getDefaultValue.getValue, colType)
-        builder.defaultValue(value)
+    var IsAutoIncrementingPresent = false
+    metadata.getColumnsList.asScala.foreach { col =>
+      if (col.getIsAutoIncrementing) {
+        IsAutoIncrementingPresent = true
       }
+    }
+    val columns = new util.ArrayList[ColumnSchema]()
+    val colIds = new util.ArrayList[Integer]()
+    val toId = metadata.getColumnIdsMap.asScala
+    metadata.getColumnsList.asScala.foreach { col =>
+      if (!col.getIsAutoIncrementing) {
+        val colType = Type.getTypeForName(col.getType)
+        val builder = new ColumnSchemaBuilder(col.getName, colType)
+          .nullable(col.getIsNullable)
+          .encoding(Encoding.valueOf(col.getEncoding))
+          
.compressionAlgorithm(CompressionAlgorithm.valueOf(col.getCompression))
+          .desiredBlockSize(col.getBlockSize)
+          .comment(col.getComment)
+        if (IsAutoIncrementingPresent) {
+          builder.nonUniqueKey(col.getIsKey)
+        } else {
+          builder.key(col.getIsKey)
+        }
 
-      if (col.hasTypeAttributes) {
-        val attributes = col.getTypeAttributes
-        builder.typeAttributes(
-          new ColumnTypeAttributesBuilder()
-            .precision(attributes.getPrecision)
-            .scale(attributes.getScale)
-            .length(attributes.getLength)
-            .build()
-        )
+        if (col.hasDefaultValue) {
+          val value = valueFromString(col.getDefaultValue.getValue, colType)
+          builder.defaultValue(value)
+        }
+
+        if (col.hasTypeAttributes) {
+          val attributes = col.getTypeAttributes
+          builder.typeAttributes(
+            new ColumnTypeAttributesBuilder()
+              .precision(attributes.getPrecision)
+              .scale(attributes.getScale)
+              .length(attributes.getLength)
+              .build()
+          )
+        }
+        colIds.add(toId(col.getName))
+        columns.add(builder.build())
       }
-      builder.build()
     }
-    val toId = metadata.getColumnIdsMap.asScala
-    val colIds = metadata.getColumnsList.asScala.map(_.getName).map(toId)
-    new Schema(columns.asJava, colIds.asJava)
+    new Schema(columns, colIds)
   }
 
   private def getValue(row: PartialRow, columnName: String, colType: Type): 
Any = {
diff --git 
a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala 
b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
index 0a61f989c..cc9bcd4de 100644
--- 
a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
+++ 
b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala
@@ -82,6 +82,64 @@ class TestKuduBackup extends KuduTestSuite {
     validateTablesMatch(tableName, s"$tableName-restore")
   }
 
+  @Test
+  def testAutoIncrementingColumnBackupAndRestore() {
+    val rowCount = 100
+    val expectedRowCount = 200
+    val simpleAutoIncrementingTableOptions = new CreateTableOptions()
+      .setRangePartitionColumns(List("key").asJava)
+      .setNumReplicas(1)
+    val AutoIncrementingTable = kuduClient.createTable(
+      simpleAutoIncrementingTableName,
+      simpleAutoIncrementingSchema,
+      simpleAutoIncrementingTableOptions)
+
+    val session = kuduClient.newSession()
+
+    // Insert some rows.
+    Range(0, rowCount).foreach { i =>
+      val insert = AutoIncrementingTable.newInsert
+      val row = insert.getRow
+      row.addInt("key", i)
+      row.addString("val", s"a$i")
+      session.apply(insert)
+    }
+
+    // Perform a full backup.
+    backupAndValidateTable(simpleAutoIncrementingTableName, rowCount, false)
+    // Insert some more rows.
+    Range(rowCount, 2 * rowCount).foreach { i =>
+      val insert = AutoIncrementingTable.newInsert
+      val row = insert.getRow
+      row.addInt("key", i)
+      row.addString("val", s"a$i")
+      session.apply(insert)
+    }
+    // Perform an incremental backup.
+    backupAndValidateTable(simpleAutoIncrementingTableName, rowCount, true)
+
+    // Restore the table.
+    restoreAndValidateTable(simpleAutoIncrementingTableName, expectedRowCount)
+    // Validate the table schemas match.
+    validateTablesMatch(
+      simpleAutoIncrementingTableName,
+      s"$simpleAutoIncrementingTableName-restore")
+
+    // Validate the data written in the restored table.
+    val restoreTable = 
kuduClient.openTable(s"$simpleAutoIncrementingTableName-restore")
+    val scanner = kuduClient.newScannerBuilder(restoreTable).build()
+    val rows = scanner.asScala.toList
+    assertEquals(expectedRowCount, rows.length)
+    var i = 0
+    rows.foreach { row =>
+      assertEquals(i, row.getInt("key"))
+      assertEquals(i + 1, row.getLong(Schema.getAutoIncrementingColumnName))
+      assertEquals(s"a$i", row.getString("val"))
+      i += 1
+    }
+    assertEquals(expectedRowCount, rows.length)
+  }
+
   @Test
   def testSimpleIncrementalBackupAndRestore() {
     insertRows(table, 100) // Insert data into the default test table.
diff --git 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
index e30dbf009..6f5c4fe43 100644
--- 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
+++ 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduTestSuite.scala
@@ -55,6 +55,7 @@ trait KuduTestSuite {
   val tableName: String = "test"
   val owner: String = "testuser"
   val simpleTableName: String = "simple-test"
+  val simpleAutoIncrementingTableName: String = "simple-auto-incrementing-test"
 
   lazy val schema: Schema = {
     val columns = List(
@@ -107,6 +108,13 @@ trait KuduTestSuite {
     new Schema(columns)
   }
 
+  lazy val simpleAutoIncrementingSchema: Schema = {
+    val columns = List(
+      new ColumnSchemaBuilder("key", Type.INT32).nonUniqueKey(true).build(),
+      new ColumnSchemaBuilder("val", 
Type.STRING).nullable(true).build()).asJava
+    new Schema(columns)
+  }
+
   val tableOptions: CreateTableOptions = {
     val bottom = schema.newPartialRow() // Unbounded.
     val middle = schema.newPartialRow()
diff --git a/src/kudu/common/schema.cc b/src/kudu/common/schema.cc
index 679b3548a..7a122cada 100644
--- a/src/kudu/common/schema.cc
+++ b/src/kudu/common/schema.cc
@@ -310,8 +310,11 @@ Status Schema::Reset(vector<ColumnSchema> cols,
     if (col.name().empty()) {
       return Status::InvalidArgument("column names must be non-empty");
     }
+    // We have to check for the number of key columns here as
+    // ColumnSchema.getStrippedColumnSchema() would trigger the exception
+    // otherwise.
     if (col.name() == Schema::GetAutoIncrementingColumnName() &&
-        !col.is_auto_incrementing()) {
+        !col.is_auto_incrementing() && num_key_columns_ != 0) {
       return Status::InvalidArgument(Substitute(
           "$0 is a reserved column name", 
Schema::GetAutoIncrementingColumnName()));
     }

Reply via email to