Test for backup/restore multiple tables This is a prerequisite for testing parallel backup/restore and error handling.
- Extracted `createTable` method so it can be re-used - Added `tableName` argument to `insertRows` so it can be re-used Change-Id: I439e0d1c6904395ad382e3f059846b3b03a79af4 Reviewed-on: http://gerrit.cloudera.org:8080/10899 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <a...@cloudera.com> Reviewed-by: Grant Henke <granthe...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f62bbdd6 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f62bbdd6 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f62bbdd6 Branch: refs/heads/master Commit: f62bbdd64d1c039b91edbd1d55a1607abd023f43 Parents: fff3cf0 Author: Tony Foerster <t...@phdata.io> Authored: Tue Jul 10 07:08:52 2018 -0500 Committer: Grant Henke <granthe...@apache.org> Committed: Thu Jul 12 19:17:09 2018 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/backup/TestKuduBackup.scala | 28 +++++++++++++++++--- .../kudu/spark/kudu/DefaultSourceTest.scala | 2 +- .../kudu/spark/kudu/KuduContextTest.scala | 4 +-- .../apache/kudu/spark/kudu/KuduRDDTest.scala | 2 +- .../apache/kudu/spark/kudu/TestContext.scala | 24 ++++++++++------- 5 files changed, 42 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/f62bbdd6/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala ---------------------------------------------------------------------- diff --git a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala index 2e2e589..1c166bd 100644 --- a/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala +++ b/java/kudu-backup/src/test/scala/org/apache/kudu/backup/TestKuduBackup.scala @@ -42,7 +42,7 @@ class TestKuduBackup extends FunSuite with TestContext with Matchers { val log: Logger = LoggerFactory.getLogger(getClass) test("Simple Backup and Restore") { - insertRows(100) // Insert data into the default test table. + insertRows(table, 100) // Insert data into the default test table. backupAndRestore(tableName) @@ -93,6 +93,26 @@ class TestKuduBackup extends FunSuite with TestContext with Matchers { assertTrue(partitionSchemasMatch(tA.getPartitionSchema, tB.getPartitionSchema)) } + test("Backup and Restore Multiple Tables") { + val numRows = 1 + val table1Name = "table1" + val table2Name = "table2" + + val table1 = kuduClient.createTable(table1Name, schema, tableOptions) + val table2 = kuduClient.createTable(table2Name, schema, tableOptions) + + insertRows(table1, numRows) + insertRows(table2, numRows) + + backupAndRestore(table1Name, table2Name) + + val rdd1 = kuduContext.kuduRDD(ss.sparkContext, s"$table1Name-restore", List("key")) + assertResult(numRows)(rdd1.count()) + + val rdd2 = kuduContext.kuduRDD(ss.sparkContext, s"$table2Name-restore", List("key")) + assertResult(numRows)(rdd2.count()) + } + // TODO: Move to a Schema equals/equivalent method. def schemasMatch(before: Schema, after: Schema): Boolean = { if (before eq after) return true @@ -299,14 +319,14 @@ class TestKuduBackup extends FunSuite with TestContext with Matchers { } } - def backupAndRestore(tableName: String): Unit = { + def backupAndRestore(tableNames: String*): Unit = { val dir = Files.createTempDirectory("backup") val path = dir.toUri.toString - val backupOptions = new KuduBackupOptions(Seq(tableName), path, miniCluster.getMasterAddresses) + val backupOptions = new KuduBackupOptions(tableNames, path, miniCluster.getMasterAddresses) KuduBackup.run(backupOptions, ss) - val restoreOptions = new KuduRestoreOptions(Seq(tableName), path, miniCluster.getMasterAddresses) + val restoreOptions = new KuduRestoreOptions(tableNames, path, miniCluster.getMasterAddresses) KuduRestore.run(restoreOptions, ss) FileUtils.deleteDirectory(dir.toFile) http://git-wip-us.apache.org/repos/asf/kudu/blob/f62bbdd6/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala index 4e7d5a4..ec4d37c 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala @@ -41,7 +41,7 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfterEac override def beforeEach(): Unit = { super.beforeEach() - rows = insertRows(rowCount) + rows = insertRows(table, rowCount) sqlContext = ss.sqlContext http://git-wip-us.apache.org/repos/asf/kudu/blob/f62bbdd6/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala index ce1059b..4915002 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala @@ -60,7 +60,7 @@ class KuduContextTest extends FunSuite with TestContext with Matchers { } test("Test basic kuduRDD") { - val rows = insertRows(rowCount) + val rows = insertRows(table, rowCount) val scanList = kuduContext.kuduRDD(ss.sparkContext, "test", Seq("key", "c1_i", "c2_s", "c3_double", "c4_long", "c5_bool", "c6_short", "c7_float", "c8_binary", "c9_unixtime_micros", "c10_byte", "c11_decimal32", "c12_decimal64", "c13_decimal128")) @@ -87,7 +87,7 @@ class KuduContextTest extends FunSuite with TestContext with Matchers { } test("Test kudu-spark DataFrame") { - insertRows(rowCount) + insertRows(table, rowCount) val sqlContext = ss.sqlContext val dataDF = sqlContext.read.options(Map("kudu.master" -> miniCluster.getMasterAddresses, "kudu.table" -> "test")).kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/f62bbdd6/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala index fbd7bbb..56ff412 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala @@ -25,7 +25,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite} class KuduRDDTest extends FunSuite with TestContext with BeforeAndAfter { test("collect rows") { - insertRows(100) + insertRows(table, 100) val rdd = kuduContext.kuduRDD(ss.sparkContext, tableName, List("key")) assert(rdd.collect.length == 100) } http://git-wip-us.apache.org/repos/asf/kudu/blob/f62bbdd6/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala index 3ef0e45..a3247da 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala @@ -98,23 +98,27 @@ trait TestContext extends BeforeAndAfterEach { self: Suite => kuduContext = new KuduContext(miniCluster.getMasterAddresses, ss.sparkContext) + table = kuduClient.createTable(tableName, schema, tableOptions) + + + val simpleTableOptions = new CreateTableOptions() + .setRangePartitionColumns(List("key").asJava) + .setNumReplicas(1) + + kuduClient.createTable(simpleTableName, simpleSchema, simpleTableOptions) + } + + val tableOptions: CreateTableOptions = { val bottom = schema.newPartialRow() // Unbounded. val middle = schema.newPartialRow() middle.addInt("key", 50) val top = schema.newPartialRow() // Unbounded. - val tableOptions = new CreateTableOptions() + new CreateTableOptions() .setRangePartitionColumns(List("key").asJava) .addRangePartition(bottom, middle) .addRangePartition(middle, top) .setNumReplicas(1) - table = kuduClient.createTable(tableName, schema, tableOptions) - - val simpleTableOptions = new CreateTableOptions() - .setRangePartitionColumns(List("key").asJava) - .setNumReplicas(1) - - kuduClient.createTable(simpleTableName, simpleSchema, simpleTableOptions) } override def afterEach() { @@ -130,11 +134,11 @@ trait TestContext extends BeforeAndAfterEach { self: Suite => kuduSession.apply(delete) } - def insertRows(rowCount: Integer): IndexedSeq[(Int, Int, String, Long)] = { + def insertRows(targetTable: KuduTable, rowCount: Integer): IndexedSeq[(Int, Int, String, Long)] = { val kuduSession = kuduClient.newSession() val rows = Range(0, rowCount).map { i => - val insert = table.newInsert + val insert = targetTable.newInsert val row = insert.getRow row.addInt(0, i) row.addInt(1, i)