This is an automated email from the ASF dual-hosted git repository. ayushsaxena pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 6b2e21a93ef HIVE-27906: Iceberg: Implement Delete Orphan Files. (#4897). (Ayush Saxena, reviewed by zhangbutao) 6b2e21a93ef is described below commit 6b2e21a93ef3c1776b689a7953fc59dbf52e4be4 Author: Ayush Saxena <ayushsax...@apache.org> AuthorDate: Tue Nov 28 11:47:18 2023 +0530 HIVE-27906: Iceberg: Implement Delete Orphan Files. (#4897). (Ayush Saxena, reviewed by zhangbutao) --- .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 31 +++ .../hive/actions/HiveIcebergDeleteOrphanFiles.java | 232 +++++++++++++++++++++ .../mr/hive/TestHiveIcebergExpireSnapshots.java | 55 +++++ .../hadoop/hive/ql/parse/AlterClauseParser.g | 2 + .../apache/hadoop/hive/ql/parse/HiveLexerParent.g | 3 + .../table/execute/AlterTableExecuteAnalyzer.java | 27 ++- .../hive/ql/parse/AlterTableExecuteSpec.java | 29 ++- 7 files changed, 377 insertions(+), 2 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 10a00db534a..c729fcef430 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -158,6 +158,7 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; +import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; @@ -171,6 +172,7 @@ import org.apache.iceberg.hive.HiveSchemaUtil; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.mr.hive.actions.HiveIcebergDeleteOrphanFiles; import org.apache.iceberg.puffin.Blob; import org.apache.iceberg.puffin.BlobMetadata; import org.apache.iceberg.puffin.Puffin; @@ -849,12 +851,41 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H IcebergTableUtil.performMetadataDelete(icebergTable, deleteMetadataSpec.getBranchName(), deleteMetadataSpec.getSarg()); break; + case DELETE_ORPHAN_FILES: + int numDeleteThreads = conf.getInt(HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.varname, + HiveConf.ConfVars.HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.defaultIntVal); + AlterTableExecuteSpec.DeleteOrphanFilesDesc deleteOrphanFilesSpec = + (AlterTableExecuteSpec.DeleteOrphanFilesDesc) executeSpec.getOperationParams(); + deleteOrphanFiles(icebergTable, deleteOrphanFilesSpec.getTimestampMillis(), numDeleteThreads); + break; default: throw new UnsupportedOperationException( String.format("Operation type %s is not supported", executeSpec.getOperationType().name())); } } + private void deleteOrphanFiles(Table icebergTable, long timestampMillis, int numThreads) { + ExecutorService deleteExecutorService = null; + try { + if (numThreads > 0) { + LOG.info("Executing delete orphan files on iceberg table {} with {} threads", icebergTable.name(), numThreads); + deleteExecutorService = getDeleteExecutorService(icebergTable.name(), numThreads); + } + + HiveIcebergDeleteOrphanFiles deleteOrphanFiles = new HiveIcebergDeleteOrphanFiles(conf, icebergTable); + deleteOrphanFiles.olderThan(timestampMillis); + if (deleteExecutorService != null) { + deleteOrphanFiles.executeDeleteWith(deleteExecutorService); + } + DeleteOrphanFiles.Result result = deleteOrphanFiles.execute(); + LOG.debug("Cleaned files {} for {}", result.orphanFileLocations(), icebergTable); + } finally { + if (deleteExecutorService != null) { + deleteExecutorService.shutdown(); + } + } + } + private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnapshotsSpec expireSnapshotsSpec, int numThreads) { ExecutorService deleteExecutorService = null; diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/actions/HiveIcebergDeleteOrphanFiles.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/actions/HiveIcebergDeleteOrphanFiles.java new file mode 100644 index 00000000000..3c2e466208f --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/actions/HiveIcebergDeleteOrphanFiles.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.actions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.iceberg.DataTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.HasTableOperations; +import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.ReachableFileUtil; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.actions.DeleteOrphanFiles; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES; + +public class HiveIcebergDeleteOrphanFiles implements DeleteOrphanFiles { + + public static final String METADATA_FOLDER_NAME = "metadata"; + public static final String DATA_FOLDER_NAME = "data"; + private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergDeleteOrphanFiles.class); + private String tableLocation; + private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3); + private Consumer<String> deleteFunc; + private ExecutorService deleteExecutorService = MoreExecutors.newDirectExecutorService(); + + private final Configuration conf; + private final Table table; + + public HiveIcebergDeleteOrphanFiles(Configuration conf, Table table) { + this.conf = conf; + this.table = table; + this.deleteFunc = file -> table.io().deleteFile(file); + this.tableLocation = table.location(); + } + + @Override + public HiveIcebergDeleteOrphanFiles location(String location) { + this.tableLocation = location; + return this; + } + + @Override + public HiveIcebergDeleteOrphanFiles olderThan(long newOlderThanTimestamp) { + this.olderThanTimestamp = newOlderThanTimestamp; + return this; + } + + // TODO: Implement later, if there is any use case. + @Override + public HiveIcebergDeleteOrphanFiles deleteWith(Consumer<String> newDeleteFunc) { + this.deleteFunc = newDeleteFunc; + return this; + } + + @Override + public HiveIcebergDeleteOrphanFiles executeDeleteWith(ExecutorService executorService) { + this.deleteExecutorService = executorService; + return this; + } + + @Override + public Result execute() { + LOG.info("Cleaning orphan files for {}", table.name()); + HiveIcebergDeleteOrphanFilesResult result = new HiveIcebergDeleteOrphanFilesResult(); + result.addDeletedFiles(cleanContentFiles(olderThanTimestamp)); + result.addDeletedFiles(cleanMetadata(olderThanTimestamp)); + + LOG.debug("Deleting {} files while cleaning orphan files for {}", result.deletedFiles.size(), table.name()); + Tasks.foreach(result.deletedFiles).executeWith(deleteExecutorService).retry(3) + .stopRetryOn(FileNotFoundException.class).suppressFailureWhenFinished().onFailure((file, thrown) -> + LOG.warn("Delete failed for file: {}", file, thrown)).run(deleteFunc::accept); + return result; + } + + private Set<String> cleanContentFiles(long lastTime) { + Set<String> validFiles = Sets.union(getAllContentFilePath(), getAllStatisticsFilePath(table)); + LOG.debug("Valid content file for {} are {}", table.name(), validFiles.size()); + try { + Path dataPath = new Path(tableLocation, DATA_FOLDER_NAME); + return getFilesToBeDeleted(lastTime, validFiles, dataPath); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + + public Set<String> getAllContentFilePath() { + Set<String> validFilesPath = Sets.newHashSet(); + Table metadatTable = getMetadataTable(); + + TableScan tableScan = metadatTable.newScan(); + CloseableIterable<FileScanTask> manifestFileScanTasks = tableScan.planFiles(); + CloseableIterable<StructLike> entries = CloseableIterable.concat(entriesOfManifest(manifestFileScanTasks)); + + for (StructLike entry : entries) { + StructLike fileRecord = entry.get(4, StructLike.class); + String filePath = fileRecord.get(1, String.class); + validFilesPath.add(getUriPath(filePath)); + } + return validFilesPath; + } + + private Iterable<CloseableIterable<StructLike>> entriesOfManifest( + CloseableIterable<FileScanTask> fileScanTasks) { + return Iterables.transform( + fileScanTasks, + task -> { + assert task != null; + return ((DataTask) task).rows(); + }); + } + + public static Set<String> getAllStatisticsFilePath(Table table) { + return ReachableFileUtil.statisticsFilesLocations(table).stream().map(HiveIcebergDeleteOrphanFiles::getUriPath) + .collect(Collectors.toSet()); + } + + protected Set<String> cleanMetadata(long lastTime) { + LOG.info("{} start clean metadata files", table.name()); + try { + Set<String> validFiles = getValidMetadataFiles(table); + LOG.debug("Valid metadata files for {} are {}", table.name(), validFiles); + Path metadataLocation = new Path(tableLocation, METADATA_FOLDER_NAME); + return getFilesToBeDeleted(lastTime, validFiles, metadataLocation); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + private Set<String> getFilesToBeDeleted(long lastTime, Set<String> validFiles, Path location) + throws IOException { + Set<String> filesToDelete = Sets.newHashSet(); + FileSystem fs = location.getFileSystem(conf); + RemoteIterator<LocatedFileStatus> metadataLocations = fs.listFiles(location, true); + while (metadataLocations.hasNext()) { + LocatedFileStatus metadataFile = metadataLocations.next(); + if (metadataFile.getModificationTime() < lastTime && !validFiles.contains( + getUriPath(metadataFile.getPath().toString()))) { + filesToDelete.add(metadataFile.getPath().toString()); + } + } + return filesToDelete; + } + + private Table getMetadataTable() { + return MetadataTableUtils.createMetadataTableInstance(((HasTableOperations) table).operations(), table.name(), + table.name() + "#" + ALL_ENTRIES.name(), ALL_ENTRIES); + } + + private static Set<String> getValidMetadataFiles(Table icebergTable) { + Set<String> validFiles = Sets.newHashSet(); + Iterable<Snapshot> snapshots = icebergTable.snapshots(); + for (Snapshot snapshot : snapshots) { + String manifestListLocation = snapshot.manifestListLocation(); + validFiles.add(getUriPath(manifestListLocation)); + + List<ManifestFile> manifestFiles = snapshot.allManifests(icebergTable.io()); + for (ManifestFile manifestFile : manifestFiles) { + validFiles.add(getUriPath(manifestFile.path())); + } + } + Stream.of( + ReachableFileUtil.metadataFileLocations(icebergTable, false).stream(), + ReachableFileUtil.statisticsFilesLocations(icebergTable).stream(), + Stream.of(ReachableFileUtil.versionHintLocation(icebergTable))) + .reduce(Stream::concat) + .orElse(Stream.empty()) + .map(HiveIcebergDeleteOrphanFiles::getUriPath) + .forEach(validFiles::add); + + return validFiles; + } + + private static String getUriPath(String path) { + return URI.create(path).getPath(); + } + + static class HiveIcebergDeleteOrphanFilesResult implements Result { + + private final Set<String> deletedFiles = Sets.newHashSet(); + + @Override + public Iterable<String> orphanFileLocations() { + return deletedFiles; + } + + public void addDeletedFiles(Set<String> files) { + this.deletedFiles.addAll(files); + } + } +} diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java index 0d65c6c7366..4a3b951bde4 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java @@ -22,10 +22,15 @@ package org.apache.iceberg.mr.hive; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.List; import org.apache.commons.collections4.IterableUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.functional.RemoteIterators; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; import org.junit.Assert; import org.junit.Assume; import org.junit.Test; @@ -111,4 +116,54 @@ public class TestHiveIcebergExpireSnapshots extends HiveIcebergStorageHandlerWit table.refresh(); Assert.assertEquals(5, IterableUtils.size(table.snapshots())); } + + @Test + public void testDeleteOrphanFiles() throws IOException, InterruptedException { + TableIdentifier identifier = TableIdentifier.of("default", "source"); + Table table = + testTables.createTableWithVersions(shell, identifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, + fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 5); + Assert.assertEquals(5, table.history().size()); + + List<Object[]> rows = shell.executeStatement("SELECT * FROM " + identifier.name()); + List<Record> originalRecords = HiveIcebergTestUtils.valueForRow(table.schema(), rows); + Path orphanDataFile = new Path(table.location(), "data/dataFile"); + Path orphanMetadataFile = new Path(table.location(), "metadata/metafile"); + FileSystem fs = orphanDataFile.getFileSystem(shell.getHiveConf()); + fs.create(orphanDataFile).close(); + fs.create(orphanMetadataFile).close(); + + int numDataFiles = RemoteIterators.toList(fs.listFiles(new Path(table.location(), "data"), true)).size(); + int numMetadataFiles = RemoteIterators.toList(fs.listFiles(new Path(table.location(), "metadata"), true)).size(); + shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE DELETE ORPHAN-FILES"); + + Assert.assertEquals(numDataFiles, + RemoteIterators.toList(fs.listFiles(new Path(table.location(), "data"), true)).size()); + + Assert.assertEquals(numMetadataFiles, + RemoteIterators.toList(fs.listFiles(new Path(table.location(), "metadata"), true)).size()); + + Assert.assertTrue(fs.exists(orphanDataFile)); + Assert.assertTrue(fs.exists(orphanDataFile)); + + long time = System.currentTimeMillis() + 1000; + SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS000000"); + String timeStamp = simpleDateFormat.format(new Date(time)); + shell.executeStatement( + "ALTER TABLE " + identifier.name() + " EXECUTE DELETE ORPHAN-FILES OLDER THAN ('" + timeStamp + "')"); + + Assert.assertEquals(numDataFiles - 1, + RemoteIterators.toList(fs.listFiles(new Path(table.location(), "data"), true)).size()); + + Assert.assertEquals(numMetadataFiles - 1, + RemoteIterators.toList(fs.listFiles(new Path(table.location(), "metadata"), true)).size()); + + Assert.assertFalse(fs.exists(orphanDataFile)); + Assert.assertFalse(fs.exists(orphanDataFile)); + table.refresh(); + + rows = shell.executeStatement("SELECT * FROM " + identifier.name()); + List<Record> records = HiveIcebergTestUtils.valueForRow(table.schema(), rows); + HiveIcebergTestUtils.validateData(originalRecords, records, 0); + } } diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g index a184b41e0f4..3e6105957c0 100644 --- a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g +++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g @@ -487,6 +487,8 @@ alterStatementSuffixExecute -> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $fromTimestamp $toTimestamp) | KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_RETAIN KW_LAST numToRetain=Number -> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS KW_RETAIN $numToRetain) + | KW_EXECUTE KW_DELETE KW_ORPHAN_FILES (KW_OLDER KW_THAN LPAREN (timestamp=StringLiteral) RPAREN)? + -> ^(TOK_ALTERTABLE_EXECUTE KW_ORPHAN_FILES $timestamp?) ; alterStatementSuffixDropBranch diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g index 883b9774ffb..a26d66d214d 100644 --- a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g +++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveLexerParent.g @@ -399,6 +399,9 @@ KW_RETENTION: 'RETENTION'; KW_TAG: 'TAG'; KW_FAST_FORWARD: 'FAST-FORWARD'; KW_CHERRY_PICK: 'CHERRY-PICK'; +KW_ORPHAN_FILES: 'ORPHAN-FILES'; +KW_OLDER: 'OLDER'; +KW_THAN: 'THAN'; // Operators // NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java index cdd6f035d4d..79448df3b2a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.HiveParser; import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec; import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.CherryPickSpec; +import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.DeleteOrphanFilesDesc; import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExpireSnapshotsSpec; import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.FastForwardSpec; import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.RollbackSpec; @@ -45,9 +46,11 @@ import org.apache.hadoop.hive.ql.session.SessionState; import java.time.ZoneId; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.CHERRY_PICK; +import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.DELETE_ORPHAN_FILES; import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.EXPIRE_SNAPSHOT; import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.FAST_FORWARD; import static org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExecuteOperationType.ROLLBACK; @@ -83,7 +86,6 @@ public class AlterTableExecuteAnalyzer extends AbstractAlterTableAnalyzer { break; case HiveParser.KW_EXPIRE_SNAPSHOTS: desc = getExpireSnapshotDesc(tableName, partitionSpec, command.getChildren()); - break; case HiveParser.KW_SET_CURRENT_SNAPSHOT: desc = getSetCurrentSnapshotDesc(tableName, partitionSpec, (ASTNode) command.getChild(1)); @@ -94,6 +96,9 @@ public class AlterTableExecuteAnalyzer extends AbstractAlterTableAnalyzer { case HiveParser.KW_CHERRY_PICK: desc = getCherryPickDesc(tableName, partitionSpec, (ASTNode) command.getChild(1)); break; + case HiveParser.KW_ORPHAN_FILES: + desc = getDeleteOrphanFilesDesc(tableName, partitionSpec, command.getChildren()); + break; } rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc))); @@ -178,4 +183,24 @@ public class AlterTableExecuteAnalyzer extends AbstractAlterTableAnalyzer { } return new AlterTableExecuteDesc(tableName, partitionSpec, spec); } + + private static AlterTableExecuteDesc getDeleteOrphanFilesDesc(TableName tableName, Map<String, String> partitionSpec, + List<Node> children) throws SemanticException { + + long time = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3); + if (children.size() == 2) { + time = getTimeStampMillis((ASTNode) children.get(1)); + } + AlterTableExecuteSpec spec = new AlterTableExecuteSpec(DELETE_ORPHAN_FILES, new DeleteOrphanFilesDesc(time)); + return new AlterTableExecuteDesc(tableName, partitionSpec, spec); + } + + private static long getTimeStampMillis(ASTNode childNode) { + String childNodeText = PlanUtils.stripQuotes(childNode.getText()); + ZoneId timeZone = SessionState.get() == null ? + new HiveConf().getLocalTimeZone() : + SessionState.get().getConf().getLocalTimeZone(); + TimestampTZ time = TimestampTZUtil.parse(PlanUtils.stripQuotes(childNodeText), timeZone); + return time.toEpochMilli(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java index 2b7ca285e1c..54c8df3573c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/AlterTableExecuteSpec.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.parse; import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; + import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import java.util.Arrays; @@ -40,7 +42,8 @@ public class AlterTableExecuteSpec<T> { SET_CURRENT_SNAPSHOT, FAST_FORWARD, CHERRY_PICK, - DELETE_METADATA; + DELETE_METADATA, + DELETE_ORPHAN_FILES; } private final ExecuteOperationType operationType; @@ -270,4 +273,28 @@ public class AlterTableExecuteSpec<T> { return sarg; } } + + /** + * Value object class, that stores the delete orphan files operation specific parameters. + * <ul> + * <li>timestampMillis: the time before which files should be considered to be deleted</li> + * </ul> + */ + public static class DeleteOrphanFilesDesc { + private final long timestampMillis; + + public DeleteOrphanFilesDesc(long timestampMillis) { + Preconditions.checkArgument(timestampMillis >= 0, "TimeStamp Millis shouldn't be negative"); + this.timestampMillis = timestampMillis; + } + + public long getTimestampMillis() { + return timestampMillis; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("timestampMillis", timestampMillis).toString(); + } + } }