This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b2e21a93ef HIVE-27906: Iceberg: Implement Delete Orphan Files. 
(#4897). (Ayush Saxena, reviewed by zhangbutao)
6b2e21a93ef is described below

commit 6b2e21a93ef3c1776b689a7953fc59dbf52e4be4
Author: Ayush Saxena <ayushsax...@apache.org>
AuthorDate: Tue Nov 28 11:47:18 2023 +0530

    HIVE-27906: Iceberg: Implement Delete Orphan Files. (#4897). (Ayush Saxena, 
reviewed by zhangbutao)
---
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |  31 +++
 .../hive/actions/HiveIcebergDeleteOrphanFiles.java | 232 +++++++++++++++++++++
 .../mr/hive/TestHiveIcebergExpireSnapshots.java    |  55 +++++
 .../hadoop/hive/ql/parse/AlterClauseParser.g       |   2 +
 .../apache/hadoop/hive/ql/parse/HiveLexerParent.g  |   3 +
 .../table/execute/AlterTableExecuteAnalyzer.java   |  27 ++-
 .../hive/ql/parse/AlterTableExecuteSpec.java       |  29 ++-
 7 files changed, 377 insertions(+), 2 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 10a00db534a..c729fcef430 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -158,6 +158,7 @@ import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
+import org.apache.iceberg.actions.DeleteOrphanFiles;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.expressions.Evaluator;
 import org.apache.iceberg.expressions.Expression;
@@ -171,6 +172,7 @@ import org.apache.iceberg.hive.HiveSchemaUtil;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.hive.actions.HiveIcebergDeleteOrphanFiles;
 import org.apache.iceberg.puffin.Blob;
 import org.apache.iceberg.puffin.BlobMetadata;
 import org.apache.iceberg.puffin.Puffin;
@@ -849,12 +851,41 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
         IcebergTableUtil.performMetadataDelete(icebergTable, 
deleteMetadataSpec.getBranchName(),
             deleteMetadataSpec.getSarg());
         break;
+      case DELETE_ORPHAN_FILES:
+        int numDeleteThreads = 
conf.getInt(HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname,
+            
HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal);
+        AlterTableExecuteSpec.DeleteOrphanFilesDesc deleteOrphanFilesSpec =
+            (AlterTableExecuteSpec.DeleteOrphanFilesDesc) 
executeSpec.getOperationParams();
+        deleteOrphanFiles(icebergTable, 
deleteOrphanFilesSpec.getTimestampMillis(), numDeleteThreads);
+        break;
       default:
         throw new UnsupportedOperationException(
             String.format("Operation type %s is not supported", 
executeSpec.getOperationType().name()));
     }
   }
 
+  private void deleteOrphanFiles(Table icebergTable, long timestampMillis, int 
numThreads) {
+    ExecutorService deleteExecutorService = null;
+    try {
+      if (numThreads > 0) {
+        LOG.info("Executing delete orphan files on iceberg table {} with {} 
threads", icebergTable.name(), numThreads);
+        deleteExecutorService = getDeleteExecutorService(icebergTable.name(), 
numThreads);
+      }
+
+      HiveIcebergDeleteOrphanFiles deleteOrphanFiles = new 
HiveIcebergDeleteOrphanFiles(conf, icebergTable);
+      deleteOrphanFiles.olderThan(timestampMillis);
+      if (deleteExecutorService != null) {
+        deleteOrphanFiles.executeDeleteWith(deleteExecutorService);
+      }
+      DeleteOrphanFiles.Result result = deleteOrphanFiles.execute();
+      LOG.debug("Cleaned files {} for {}", result.orphanFileLocations(), 
icebergTable);
+    } finally {
+      if (deleteExecutorService != null) {
+        deleteExecutorService.shutdown();
+      }
+    }
+  }
+
   private void expireSnapshot(Table icebergTable, 
AlterTableExecuteSpec.ExpireSnapshotsSpec expireSnapshotsSpec,
       int numThreads) {
     ExecutorService deleteExecutorService = null;
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/actions/HiveIcebergDeleteOrphanFiles.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/actions/HiveIcebergDeleteOrphanFiles.java
new file mode 100644
index 00000000000..3c2e466208f
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/actions/HiveIcebergDeleteOrphanFiles.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive.actions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.iceberg.DataTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.ReachableFileUtil;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.actions.DeleteOrphanFiles;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES;
+
+public class HiveIcebergDeleteOrphanFiles implements DeleteOrphanFiles {
+
+  public static final String METADATA_FOLDER_NAME = "metadata";
+  public static final String DATA_FOLDER_NAME = "data";
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergDeleteOrphanFiles.class);
+  private String tableLocation;
+  private long olderThanTimestamp = System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(3);
+  private Consumer<String> deleteFunc;
+  private ExecutorService deleteExecutorService = 
MoreExecutors.newDirectExecutorService();
+
+  private final Configuration conf;
+  private final Table table;
+
+  public HiveIcebergDeleteOrphanFiles(Configuration conf, Table table) {
+    this.conf = conf;
+    this.table = table;
+    this.deleteFunc = file -> table.io().deleteFile(file);
+    this.tableLocation = table.location();
+  }
+
+  @Override
+  public HiveIcebergDeleteOrphanFiles location(String location) {
+    this.tableLocation = location;
+    return this;
+  }
+
+  @Override
+  public HiveIcebergDeleteOrphanFiles olderThan(long newOlderThanTimestamp) {
+    this.olderThanTimestamp = newOlderThanTimestamp;
+    return this;
+  }
+
+  // TODO: Implement later, if there is any use case.
+  @Override
+  public HiveIcebergDeleteOrphanFiles deleteWith(Consumer<String> 
newDeleteFunc) {
+    this.deleteFunc = newDeleteFunc;
+    return this;
+  }
+
+  @Override
+  public HiveIcebergDeleteOrphanFiles executeDeleteWith(ExecutorService 
executorService) {
+    this.deleteExecutorService = executorService;
+    return this;
+  }
+
+  @Override
+  public Result execute() {
+    LOG.info("Cleaning orphan files for {}", table.name());
+    HiveIcebergDeleteOrphanFilesResult result = new 
HiveIcebergDeleteOrphanFilesResult();
+    result.addDeletedFiles(cleanContentFiles(olderThanTimestamp));
+    result.addDeletedFiles(cleanMetadata(olderThanTimestamp));
+
+    LOG.debug("Deleting {} files while cleaning orphan files for {}", 
result.deletedFiles.size(), table.name());
+    
Tasks.foreach(result.deletedFiles).executeWith(deleteExecutorService).retry(3)
+        
.stopRetryOn(FileNotFoundException.class).suppressFailureWhenFinished().onFailure((file,
 thrown) ->
+          LOG.warn("Delete failed for file: {}", file, 
thrown)).run(deleteFunc::accept);
+    return result;
+  }
+
+  private Set<String> cleanContentFiles(long lastTime) {
+    Set<String> validFiles = Sets.union(getAllContentFilePath(), 
getAllStatisticsFilePath(table));
+    LOG.debug("Valid content file for {} are {}", table.name(), 
validFiles.size());
+    try {
+      Path dataPath = new Path(tableLocation, DATA_FOLDER_NAME);
+      return getFilesToBeDeleted(lastTime, validFiles, dataPath);
+    } catch (IOException ex) {
+      throw new UncheckedIOException(ex);
+    }
+  }
+
+  public Set<String> getAllContentFilePath() {
+    Set<String> validFilesPath = Sets.newHashSet();
+    Table metadatTable = getMetadataTable();
+
+    TableScan tableScan = metadatTable.newScan();
+    CloseableIterable<FileScanTask> manifestFileScanTasks = 
tableScan.planFiles();
+    CloseableIterable<StructLike> entries = 
CloseableIterable.concat(entriesOfManifest(manifestFileScanTasks));
+
+    for (StructLike entry : entries) {
+      StructLike fileRecord = entry.get(4, StructLike.class);
+      String filePath = fileRecord.get(1, String.class);
+      validFilesPath.add(getUriPath(filePath));
+    }
+    return validFilesPath;
+  }
+
+  private Iterable<CloseableIterable<StructLike>> entriesOfManifest(
+      CloseableIterable<FileScanTask> fileScanTasks) {
+    return Iterables.transform(
+        fileScanTasks,
+        task -> {
+          assert task != null;
+          return ((DataTask) task).rows();
+        });
+  }
+
+  public static Set<String> getAllStatisticsFilePath(Table table) {
+    return 
ReachableFileUtil.statisticsFilesLocations(table).stream().map(HiveIcebergDeleteOrphanFiles::getUriPath)
+        .collect(Collectors.toSet());
+  }
+
+  protected Set<String> cleanMetadata(long lastTime) {
+    LOG.info("{} start clean metadata files", table.name());
+    try {
+      Set<String> validFiles = getValidMetadataFiles(table);
+      LOG.debug("Valid metadata files for {} are {}", table.name(), 
validFiles);
+      Path metadataLocation = new Path(tableLocation, METADATA_FOLDER_NAME);
+      return getFilesToBeDeleted(lastTime, validFiles, metadataLocation);
+    } catch (IOException ioe) {
+      throw new UncheckedIOException(ioe);
+    }
+  }
+
+  private Set<String> getFilesToBeDeleted(long lastTime, Set<String> 
validFiles, Path location)
+      throws IOException {
+    Set<String> filesToDelete = Sets.newHashSet();
+    FileSystem fs = location.getFileSystem(conf);
+    RemoteIterator<LocatedFileStatus> metadataLocations = 
fs.listFiles(location, true);
+    while (metadataLocations.hasNext()) {
+      LocatedFileStatus metadataFile = metadataLocations.next();
+      if (metadataFile.getModificationTime() < lastTime && 
!validFiles.contains(
+          getUriPath(metadataFile.getPath().toString()))) {
+        filesToDelete.add(metadataFile.getPath().toString());
+      }
+    }
+    return filesToDelete;
+  }
+
+  private Table getMetadataTable() {
+    return 
MetadataTableUtils.createMetadataTableInstance(((HasTableOperations) 
table).operations(), table.name(),
+        table.name() + "#" + ALL_ENTRIES.name(), ALL_ENTRIES);
+  }
+
+  private static Set<String> getValidMetadataFiles(Table icebergTable) {
+    Set<String> validFiles = Sets.newHashSet();
+    Iterable<Snapshot> snapshots = icebergTable.snapshots();
+    for (Snapshot snapshot : snapshots) {
+      String manifestListLocation = snapshot.manifestListLocation();
+      validFiles.add(getUriPath(manifestListLocation));
+
+      List<ManifestFile> manifestFiles = 
snapshot.allManifests(icebergTable.io());
+      for (ManifestFile manifestFile : manifestFiles) {
+        validFiles.add(getUriPath(manifestFile.path()));
+      }
+    }
+    Stream.of(
+            ReachableFileUtil.metadataFileLocations(icebergTable, 
false).stream(),
+            ReachableFileUtil.statisticsFilesLocations(icebergTable).stream(),
+            Stream.of(ReachableFileUtil.versionHintLocation(icebergTable)))
+        .reduce(Stream::concat)
+        .orElse(Stream.empty())
+        .map(HiveIcebergDeleteOrphanFiles::getUriPath)
+        .forEach(validFiles::add);
+
+    return validFiles;
+  }
+
+  private static String getUriPath(String path) {
+    return URI.create(path).getPath();
+  }
+
+  static class HiveIcebergDeleteOrphanFilesResult implements Result {
+
+    private final Set<String> deletedFiles = Sets.newHashSet();
+
+    @Override
+    public Iterable<String> orphanFileLocations() {
+      return deletedFiles;
+    }
+
+    public void addDeletedFiles(Set<String> files) {
+      this.deletedFiles.addAll(files);
+    }
+  }
+}
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
index 0d65c6c7366..4a3b951bde4 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
@@ -22,10 +22,15 @@ package org.apache.iceberg.mr.hive;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.List;
 import org.apache.commons.collections4.IterableUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.functional.RemoteIterators;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
@@ -111,4 +116,54 @@ public class TestHiveIcebergExpireSnapshots extends 
HiveIcebergStorageHandlerWit
     table.refresh();
     Assert.assertEquals(5, IterableUtils.size(table.snapshots()));
   }
+
+  @Test
+  public void testDeleteOrphanFiles() throws IOException, InterruptedException 
{
+    TableIdentifier identifier = TableIdentifier.of("default", "source");
+    Table table =
+        testTables.createTableWithVersions(shell, identifier.name(), 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+            fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 
5);
+    Assert.assertEquals(5, table.history().size());
+
+    List<Object[]> rows = shell.executeStatement("SELECT * FROM " + 
identifier.name());
+    List<Record> originalRecords = 
HiveIcebergTestUtils.valueForRow(table.schema(), rows);
+    Path orphanDataFile = new Path(table.location(), "data/dataFile");
+    Path orphanMetadataFile = new Path(table.location(), "metadata/metafile");
+    FileSystem fs = orphanDataFile.getFileSystem(shell.getHiveConf());
+    fs.create(orphanDataFile).close();
+    fs.create(orphanMetadataFile).close();
+
+    int numDataFiles = RemoteIterators.toList(fs.listFiles(new 
Path(table.location(), "data"), true)).size();
+    int numMetadataFiles = RemoteIterators.toList(fs.listFiles(new 
Path(table.location(), "metadata"), true)).size();
+    shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE 
DELETE ORPHAN-FILES");
+
+    Assert.assertEquals(numDataFiles,
+        RemoteIterators.toList(fs.listFiles(new Path(table.location(), 
"data"), true)).size());
+
+    Assert.assertEquals(numMetadataFiles,
+        RemoteIterators.toList(fs.listFiles(new Path(table.location(), 
"metadata"), true)).size());
+
+    Assert.assertTrue(fs.exists(orphanDataFile));
+    Assert.assertTrue(fs.exists(orphanDataFile));
+
+    long time = System.currentTimeMillis() + 1000;
+    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS000000");
+    String timeStamp = simpleDateFormat.format(new Date(time));
+    shell.executeStatement(
+        "ALTER TABLE " + identifier.name() + " EXECUTE DELETE ORPHAN-FILES 
OLDER THAN ('" + timeStamp + "')");
+
+    Assert.assertEquals(numDataFiles - 1,
+        RemoteIterators.toList(fs.listFiles(new Path(table.location(), 
"data"), true)).size());
+
+    Assert.assertEquals(numMetadataFiles - 1,
+        RemoteIterators.toList(fs.listFiles(new Path(table.location(), 
"metadata"), true)).size());
+
+    Assert.assertFalse(fs.exists(orphanDataFile));
+    Assert.assertFalse(fs.exists(orphanDataFile));
+    table.refresh();
+
+    rows = shell.executeStatement("SELECT * FROM " + identifier.name());
+    List<Record> records = HiveIcebergTestUtils.valueForRow(table.schema(), 
rows);
+    HiveIcebergTestUtils.validateData(originalRecords, records, 0);
+  }
 }
diff --git 
a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g 
b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
index a184b41e0f4..3e6105957c0 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
@@ -487,6 +487,8 @@ alterStatementSuffixExecute
     -> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $fromTimestamp 
$toTimestamp)
     | KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_RETAIN KW_LAST numToRetain=Number
     -> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS KW_RETAIN $numToRetain)
+    | KW_EXECUTE KW_DELETE KW_ORPHAN_FILES (KW_OLDER KW_THAN LPAREN 
(timestamp=StringLiteral) RPAREN)?
+    -> ^(TOK_ALTERTABLE_EXECUTE KW_ORPHAN_FILES $timestamp?)
     ;
 
 alterStatementSuffixDropBranch
diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g 
b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g
index 883b9774ffb..a26d66d214d 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g
@@ -399,6 +399,9 @@ KW_RETENTION: 'RETENTION';
 KW_TAG: 'TAG';
 KW_FAST_FORWARD: 'FAST-FORWARD';
 KW_CHERRY_PICK: 'CHERRY-PICK';
+KW_ORPHAN_FILES: 'ORPHAN-FILES';
+KW_OLDER: 'OLDER';
+KW_THAN: 'THAN';
 
 // Operators
 // NOTE: if you add a new function/operator, add it to sysFuncNames so that 
describe function _FUNC_ will work.
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
index cdd6f035d4d..79448df3b2a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.HiveParser;
 import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
 import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.CherryPickSpec;
+import 
org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.DeleteOrphanFilesDesc;
 import 
org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExpireSnapshotsSpec;
 import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.FastForwardSpec;
 import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.RollbackSpec;
@@ -45,9 +46,11 @@ import org.apache.hadoop.hive.ql.session.SessionState;
 import java.time.ZoneId;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 import static 
org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.CHERRY_PICK;
+import static 
org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.DELETE_ORPHAN_FILES;
 import static 
org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.EXPIRE_SNAPSHOT;
 import static 
org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.FAST_FORWARD;
 import static 
org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.ROLLBACK;
@@ -83,7 +86,6 @@ public class AlterTableExecuteAnalyzer extends 
AbstractAlterTableAnalyzer {
         break;
       case HiveParser.KW_EXPIRE_SNAPSHOTS:
         desc = getExpireSnapshotDesc(tableName, partitionSpec,  
command.getChildren());
-
         break;
       case HiveParser.KW_SET_CURRENT_SNAPSHOT:
         desc = getSetCurrentSnapshotDesc(tableName, partitionSpec, (ASTNode) 
command.getChild(1));
@@ -94,6 +96,9 @@ public class AlterTableExecuteAnalyzer extends 
AbstractAlterTableAnalyzer {
       case HiveParser.KW_CHERRY_PICK:
         desc = getCherryPickDesc(tableName, partitionSpec, (ASTNode) 
command.getChild(1));
         break;
+      case HiveParser.KW_ORPHAN_FILES:
+        desc = getDeleteOrphanFilesDesc(tableName, partitionSpec,  
command.getChildren());
+        break;        
     }
 
     rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), 
desc)));
@@ -178,4 +183,24 @@ public class AlterTableExecuteAnalyzer extends 
AbstractAlterTableAnalyzer {
     }
     return new AlterTableExecuteDesc(tableName, partitionSpec, spec);
   }
+
+  private static AlterTableExecuteDesc getDeleteOrphanFilesDesc(TableName 
tableName, Map<String, String> partitionSpec,
+      List<Node> children) throws SemanticException {
+
+    long time = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
+    if (children.size() == 2) {
+      time = getTimeStampMillis((ASTNode) children.get(1));
+    }
+    AlterTableExecuteSpec spec = new 
AlterTableExecuteSpec(DELETE_ORPHAN_FILES, new DeleteOrphanFilesDesc(time));
+    return new AlterTableExecuteDesc(tableName, partitionSpec, spec);
+  }
+
+  private static long getTimeStampMillis(ASTNode childNode) {
+    String childNodeText = PlanUtils.stripQuotes(childNode.getText());
+    ZoneId timeZone = SessionState.get() == null ?
+        new HiveConf().getLocalTimeZone() :
+        SessionState.get().getConf().getLocalTimeZone();
+    TimestampTZ time = 
TimestampTZUtil.parse(PlanUtils.stripQuotes(childNodeText), timeZone);
+    return time.toEpochMilli();
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
index 2b7ca285e1c..54c8df3573c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 
 import java.util.Arrays;
@@ -40,7 +42,8 @@ public class AlterTableExecuteSpec<T> {
     SET_CURRENT_SNAPSHOT,
     FAST_FORWARD,
     CHERRY_PICK,
-    DELETE_METADATA;
+    DELETE_METADATA,
+    DELETE_ORPHAN_FILES;
   }
 
   private final ExecuteOperationType operationType;
@@ -270,4 +273,28 @@ public class AlterTableExecuteSpec<T> {
       return sarg;
     }
   }
+
+  /**
+   * Value object class, that stores the delete orphan files operation 
specific parameters.
+   * <ul>
+   *   <li>timestampMillis: the time before which files should be considered 
to be deleted</li>
+   * </ul>
+   */
+  public static class DeleteOrphanFilesDesc {
+    private final long timestampMillis;
+
+    public DeleteOrphanFilesDesc(long timestampMillis) {
+      Preconditions.checkArgument(timestampMillis >= 0, "TimeStamp Millis 
shouldn't be negative");
+      this.timestampMillis = timestampMillis;
+    }
+
+    public long getTimestampMillis() {
+      return timestampMillis;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this).add("timestampMillis", 
timestampMillis).toString();
+    }
+  }
 }

Reply via email to