aokolnychyi commented on a change in pull request #1473:
URL: https://github.com/apache/iceberg/pull/1473#discussion_r493761993



##########
File path: 
spark3/src/main/java/org/apache/iceberg/spark/procedures/RollbackToSnapshotProcedure.java
##########
@@ -0,0 +1,74 @@
+
+package org.apache.iceberg.spark.procedures;
+
+import java.lang.invoke.MethodHandle;
+import java.util.Collections;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.MethodHandleUtil;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.ProcedureParameter;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+public class RollbackToSnapshotProcedure extends BaseProcedure {
+
+  private static final MethodHandle METHOD_HANDLE = 
MethodHandleUtil.methodHandle(
+      RollbackToSnapshotProcedure.class,
+      "rollbackToSnapshot",
+      String.class,
+      long.class);
+
+  private final ProcedureParameter[] parameters = new ProcedureParameter[]{
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.required("snapshot_id", DataTypes.LongType)
+  };
+  private final StructField[] outputFields = new StructField[]{
+      new StructField("previous_current_snapshot_id", DataTypes.LongType, 
false, Metadata.empty()),
+      new StructField("current_snapshot_id", DataTypes.LongType, false, 
Metadata.empty())
+  };
+  private final StructType outputType = new StructType(outputFields);
+  private final MethodHandle methodHandle = METHOD_HANDLE.bindTo(this);
+
+  public RollbackToSnapshotProcedure(TableCatalog catalog) {
+    super(catalog);
+  }
+
+  public Iterable<Row> rollbackToSnapshot(String identAsString, long 
snapshotId) {
+    Identifier ident = parseIdentifier(identAsString);
+    SparkTable sparkTable = loadTable(ident);
+    Table icebergTable = sparkTable.table();
+
+    Snapshot previousCurrentSnapshot = icebergTable.currentSnapshot();
+
+    icebergTable.manageSnapshots()
+        .rollbackTo(snapshotId)
+        .commit();
+
+    refreshCache(ident, sparkTable);

Review comment:
       Let's discuss it in #1485.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to