This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 21e6329d16 [procedure] Enhance return result for rollback procedure
(#5533)
21e6329d16 is described below
commit 21e6329d1640a7eeabac973895693780afbd9941
Author: askwang <[email protected]>
AuthorDate: Fri Apr 25 13:19:56 2025 +0800
[procedure] Enhance return result for rollback procedure (#5533)
---
.../flink/procedure/RollbackToProcedure.java | 17 ++++-
.../procedure/RollbackToTimestampProcedure.java | 13 +++-
.../flink/procedure/RollbackProcedureITCase.java | 88 ++++++++++++++++++++++
.../paimon/spark/sql/RollbackProcedureTest.scala | 5 +-
.../paimon/spark/procedure/RollbackProcedure.java | 22 +++++-
.../procedure/RollbackToTimestampProcedure.java | 28 ++++---
.../procedure/RollbackToWatermarkProcedure.java | 28 ++++---
.../spark/procedure/RollbackProcedureTest.scala | 16 ++--
8 files changed, 181 insertions(+), 36 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
index 9bca6505c9..8320453f8e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
@@ -18,15 +18,20 @@
package org.apache.paimon.flink.procedure;
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
/**
* Rollback procedure. Usage:
@@ -52,16 +57,24 @@ public class RollbackToProcedure extends ProcedureBase {
type = @DataTypeHint("BIGINT"),
isOptional = true)
})
- public String[] call(
+ public @DataTypeHint("ROW<previous_snapshot_id BIGINT, current_snapshot_id
BIGINT>") Row[] call(
ProcedureContext procedureContext, String tableId, String tagName,
Long snapshotId)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
+
+ FileStore<?> store = ((FileStoreTable) table).store();
+ Snapshot latestSnapshot = store.snapshotManager().latestSnapshot();
+ Preconditions.checkNotNull(latestSnapshot, "Latest snapshot is null,
can not rollback.");
+
+ long rollbackSnapshotId;
if (!StringUtils.isNullOrWhitespaceOnly(tagName)) {
table.rollbackTo(tagName);
+ rollbackSnapshotId =
store.newTagManager().getOrThrow(tagName).trimToSnapshot().id();
} else {
table.rollbackTo(snapshotId);
+ rollbackSnapshotId = snapshotId;
}
- return new String[] {"Success"};
+ return new Row[] {Row.of(latestSnapshot.id(), rollbackSnapshotId)};
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java
index f84dab8eab..a0b212a42d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java
@@ -24,11 +24,13 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
/**
* Rollback to timestamp procedure. Usage:
@@ -47,16 +49,21 @@ public class RollbackToTimestampProcedure extends
ProcedureBase {
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "timestamp", type =
@DataTypeHint("BIGINT"))
})
- public String[] call(ProcedureContext procedureContext, String tableId,
Long timestamp)
+ public @DataTypeHint("ROW<previous_snapshot_id BIGINT, current_snapshot_id
BIGINT>") Row[] call(
+ ProcedureContext procedureContext, String tableId, Long timestamp)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
FileStoreTable fileStoreTable = (FileStoreTable) table;
- Snapshot snapshot =
fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp);
+ SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
+ Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+ Preconditions.checkNotNull(latestSnapshot, "Latest snapshot is null,
can not rollback.");
+
+ Snapshot snapshot = snapshotManager.earlierOrEqualTimeMills(timestamp);
Preconditions.checkNotNull(
snapshot, String.format("count not find snapshot earlier than
%s", timestamp));
long snapshotId = snapshot.id();
fileStoreTable.rollbackTo(snapshotId);
- return new String[] {String.format("Success roll back to snapshot: %s
.", snapshotId)};
+ return new Row[] {Row.of(latestSnapshot.id(), snapshotId)};
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackProcedureITCase.java
new file mode 100644
index 0000000000..884237a16f
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RollbackProcedureITCase.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** IT Case for {@link RollbackToProcedure} and {@link
RollbackToTimestampProcedure}. */
+public class RollbackProcedureITCase extends CatalogITCaseBase {
+
+ @Test
+ public void testRollbackTo() throws Exception {
+ sql(
+ "CREATE TABLE T (id STRING, name STRING,"
+ + " PRIMARY KEY (id) NOT ENFORCED)"
+ + " WITH ('bucket'='1', 'write-only'='true')");
+
+ FileStoreTable table = paimonTable("T");
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ for (int i = 1; i <= 5; i++) {
+ sql("INSERT INTO T VALUES ('" + i + "', '" + i + "')");
+ }
+ assertEquals(5, snapshotManager.latestSnapshotId());
+
+ sql("CALL sys.create_tag(`table` => 'default.T', tag => 'tag-2',
snapshot_id => 2)");
+
+ // rollback to snapshot_id
+ long latestSnapshotId = snapshotManager.latestSnapshot().id();
+ assertThat(sql("CALL sys.rollback_to(`table` => 'default.T',
snapshot_id => 4)"))
+ .containsExactly(Row.of(latestSnapshotId, 4L));
+
+ // rollback to tag
+ latestSnapshotId = snapshotManager.latestSnapshot().id();
+ assertThat(sql("CALL sys.rollback_to(`table` => 'default.T', tag =>
'tag-2')"))
+ .containsExactly(Row.of(latestSnapshotId, 2L));
+ }
+
+ @Test
+ public void testRollbackToTimestamp() throws Exception {
+ sql(
+ "CREATE TABLE T (id STRING, name STRING,"
+ + " PRIMARY KEY (id) NOT ENFORCED)"
+ + " WITH ('bucket'='1', 'write-only'='true')");
+
+ FileStoreTable table = paimonTable("T");
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ sql("INSERT INTO T VALUES ('1', 'a')");
+ sql("INSERT INTO T VALUES ('1', 'b')");
+ long timestamp = System.currentTimeMillis();
+
+ sql("INSERT INTO T VALUES ('3', 'c')");
+ assertEquals(3, snapshotManager.latestSnapshotId());
+
+ // rollback to timestamp
+ long latestSnapshotId = snapshotManager.latestSnapshot().id();
+ assertThat(
+ sql(
+ String.format(
+ "CALL
sys.rollback_to_timestamp(`table` => 'default.T', `timestamp` => %s)",
+ timestamp)))
+ .containsExactly(Row.of(latestSnapshotId, 2L));
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
index 7a3a5730ed..605b4cadf6 100644
---
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
@@ -51,6 +51,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with
StreamTest {
}
.start()
+ val table = loadTable("T")
val query = () => spark.sql("SELECT * FROM T ORDER BY a")
try {
@@ -79,13 +80,13 @@ class RollbackProcedureTest extends PaimonSparkTestBase
with StreamTest {
// rollback to snapshot
checkAnswer(
spark.sql("CALL paimon.sys.rollback(table => 'test.T', version
=> '2')"),
- Row(true) :: Nil)
+ Row(table.latestSnapshot().get().id, 2) :: Nil)
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
// rollback to tag
checkAnswer(
spark.sql("CALL paimon.sys.rollback(table => 'test.T', version
=> 'test_tag')"),
- Row(true) :: Nil)
+ Row(table.latestSnapshot().get().id, 1) :: Nil)
checkAnswer(query(), Row(1, "a") :: Nil)
} finally {
stream.stop()
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java
index d9a8876332..61060554dc 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackProcedure.java
@@ -18,6 +18,9 @@
package org.apache.paimon.spark.procedure;
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
@@ -47,7 +50,13 @@ public class RollbackProcedure extends BaseProcedure {
private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
- new StructField("result", DataTypes.BooleanType, true,
Metadata.empty())
+ new StructField(
+ "previous_snapshot_id",
+ DataTypes.LongType,
+ false,
+ Metadata.empty()),
+ new StructField(
+ "current_snapshot_id", DataTypes.LongType,
false, Metadata.empty())
});
private RollbackProcedure(TableCatalog tableCatalog) {
@@ -92,12 +101,21 @@ public class RollbackProcedure extends BaseProcedure {
tag = args.isNullAt(3) ? null : args.getString(3);
}
+ FileStore<?> store = ((FileStoreTable) table).store();
+ Snapshot latestSnapshot =
store.snapshotManager().latestSnapshot();
+ Preconditions.checkNotNull(
+ latestSnapshot, "Latest snapshot is null, can not
rollback.");
+
+ long currentSnapshotId;
if (snapshot != null) {
table.rollbackTo(snapshot);
+ currentSnapshotId = snapshot;
} else {
table.rollbackTo(tag);
+ currentSnapshotId =
+
store.newTagManager().getOrThrow(tag).trimToSnapshot().id();
}
- InternalRow outputRow = newInternalRow(true);
+ InternalRow outputRow =
newInternalRow(latestSnapshot.id(), currentSnapshotId);
return new InternalRow[] {outputRow};
});
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToTimestampProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToTimestampProcedure.java
index a01f08b3fc..d0dae70484 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToTimestampProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToTimestampProcedure.java
@@ -21,14 +21,15 @@ package org.apache.paimon.spark.procedure;
import org.apache.paimon.Snapshot;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.UTF8String;
import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -46,7 +47,13 @@ public class RollbackToTimestampProcedure extends
BaseProcedure {
private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
- new StructField("result", StringType, true,
Metadata.empty())
+ new StructField(
+ "previous_snapshot_id",
+ DataTypes.LongType,
+ false,
+ Metadata.empty()),
+ new StructField(
+ "current_snapshot_id", DataTypes.LongType,
false, Metadata.empty())
});
private RollbackToTimestampProcedure(TableCatalog tableCatalog) {
@@ -72,19 +79,18 @@ public class RollbackToTimestampProcedure extends
BaseProcedure {
tableIdent,
table -> {
FileStoreTable fileStoreTable = (FileStoreTable) table;
- Snapshot snapshot =
-
fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp);
+ SnapshotManager snapshotManager =
fileStoreTable.snapshotManager();
+ Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+ Preconditions.checkNotNull(
+ latestSnapshot, "Latest snapshot is null, can not
rollback.");
+
+ Snapshot snapshot =
snapshotManager.earlierOrEqualTimeMills(timestamp);
Preconditions.checkNotNull(
snapshot,
- String.format("count not find snapshot earlier
than %s", timestamp));
+ String.format("Can not find snapshot earlier than
%s", timestamp));
long snapshotId = snapshot.id();
fileStoreTable.rollbackTo(snapshotId);
- InternalRow outputRow =
- newInternalRow(
- UTF8String.fromString(
- String.format(
- "Success roll back to
snapshot: %s .",
- snapshotId)));
+ InternalRow outputRow =
newInternalRow(latestSnapshot.id(), snapshotId);
return new InternalRow[] {outputRow};
});
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java
index 09185f02c9..ac11fa2f32 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RollbackToWatermarkProcedure.java
@@ -21,14 +21,15 @@ package org.apache.paimon.spark.procedure;
import org.apache.paimon.Snapshot;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.UTF8String;
import static org.apache.spark.sql.types.DataTypes.LongType;
import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -46,7 +47,13 @@ public class RollbackToWatermarkProcedure extends
BaseProcedure {
private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
- new StructField("result", StringType, true,
Metadata.empty())
+ new StructField(
+ "previous_snapshot_id",
+ DataTypes.LongType,
+ false,
+ Metadata.empty()),
+ new StructField(
+ "current_snapshot_id", DataTypes.LongType,
false, Metadata.empty())
});
private RollbackToWatermarkProcedure(TableCatalog tableCatalog) {
@@ -72,19 +79,18 @@ public class RollbackToWatermarkProcedure extends
BaseProcedure {
tableIdent,
table -> {
FileStoreTable fileStoreTable = (FileStoreTable) table;
- Snapshot snapshot =
-
fileStoreTable.snapshotManager().earlierOrEqualWatermark(watermark);
+ SnapshotManager snapshotManager =
fileStoreTable.snapshotManager();
+ Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+ Preconditions.checkNotNull(
+ latestSnapshot, "Latest snapshot is null, can not
rollback.");
+
+ Snapshot snapshot =
snapshotManager.earlierOrEqualWatermark(watermark);
Preconditions.checkNotNull(
snapshot,
- String.format("count not find snapshot earlier
than %s", watermark));
+ String.format("Can not find snapshot earlier than
%s", watermark));
long snapshotId = snapshot.id();
fileStoreTable.rollbackTo(snapshotId);
- InternalRow outputRow =
- newInternalRow(
- UTF8String.fromString(
- String.format(
- "Success roll back to
snapshot: %s .",
- snapshotId)));
+ InternalRow outputRow =
newInternalRow(latestSnapshot.id(), snapshotId);
return new InternalRow[] {outputRow};
});
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
index dde0af3d22..66f2d57e02 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
@@ -37,7 +37,8 @@ class RollbackProcedureTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
|""".stripMargin)
- val location = loadTable("T").location().toString
+ val table = loadTable("T")
+ val location = table.location().toString
val inputData = MemoryStream[(Int, String)]
val stream = inputData
@@ -79,13 +80,14 @@ class RollbackProcedureTest extends PaimonSparkTestBase
with StreamTest {
// rollback to snapshot
checkAnswer(
spark.sql("CALL paimon.sys.rollback(table => 'test.T', version
=> '2')"),
- Row(true) :: Nil)
+ Row(table.latestSnapshot().get().id, 2) :: Nil)
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
// rollback to tag
+ val taggedSnapshotId =
table.tagManager().getOrThrow("test_tag").trimToSnapshot().id
checkAnswer(
spark.sql("CALL paimon.sys.rollback(table => 'test.T', version
=> 'test_tag')"),
- Row(true) :: Nil)
+ Row(table.latestSnapshot().get().id, taggedSnapshotId) :: Nil)
checkAnswer(query(), Row(1, "a") :: Nil)
} finally {
stream.stop()
@@ -100,6 +102,8 @@ class RollbackProcedureTest extends PaimonSparkTestBase
with StreamTest {
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3',
'file.format'='orc')
|""".stripMargin)
+ val table = loadTable("T")
+
val query = () => spark.sql("SELECT * FROM T ORDER BY a")
// snapshot-1
@@ -128,7 +132,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase
with StreamTest {
// rollback to snapshot
checkAnswer(
spark.sql("CALL paimon.sys.rollback(table => 'test.T', version => '3')"),
- Row(true) :: Nil)
+ Row(table.latestSnapshot().get().id, 3) :: Nil)
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)
// version/snapshot/tag can only set one of them
@@ -177,6 +181,8 @@ class RollbackProcedureTest extends PaimonSparkTestBase
with StreamTest {
}
.start()
+ val table = loadTable("T")
+
val query = () => spark.sql("SELECT * FROM T ORDER BY a")
try {
@@ -201,7 +207,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase
with StreamTest {
checkAnswer(
spark.sql(
s"CALL paimon.sys.rollback_to_timestamp(table => 'test.T',
timestamp => $timestamp)"),
- Row("Success roll back to snapshot: 2 .") :: Nil)
+ Row(table.latestSnapshot().get().id, 2) :: Nil)
checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
} finally {