This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 9f7c58f DRILL-7544: Upgrade Iceberg version to support Parquet 1.11.0 9f7c58f is described below commit 9f7c58fc5a47cfe67e5fb5f5f35971cf534133d5 Author: Arina Ielchiieva <arina.yelchiy...@gmail.com> AuthorDate: Wed Jan 22 15:55:09 2020 +0200 DRILL-7544: Upgrade Iceberg version to support Parquet 1.11.0 1. Upgraded Iceberg to the commit that supports Parquet 1.11.0. 2. Removed workaround in ExpirationHandler and used built-in logic from Iceberg library. 3. Updated description about expiration logic in README.md. --- metastore/iceberg-metastore/README.md | 10 +- metastore/iceberg-metastore/pom.xml | 12 +- .../drill/metastore/iceberg/IcebergMetastore.java | 6 +- .../iceberg/components/tables/IcebergTables.java | 4 +- .../iceberg/config/IcebergConfigConstants.java | 5 - .../iceberg/operate/ExpirationHandler.java | 263 +++------------------ .../metastore/iceberg/operate/IcebergModify.java | 5 +- .../src/main/resources/drill-metastore-module.conf | 14 +- .../iceberg/operate/TestExpirationHandler.java | 149 +++++------- 9 files changed, 119 insertions(+), 349 deletions(-) diff --git a/metastore/iceberg-metastore/README.md b/metastore/iceberg-metastore/README.md index 3db08d1..094154b 100644 --- a/metastore/iceberg-metastore/README.md +++ b/metastore/iceberg-metastore/README.md @@ -183,8 +183,8 @@ expiration process is launched. Iceberg table generates metadata for each modification operation: snapshot, manifest file, table metadata file. Also when performing delete operation, previously stored data files are not deleted. These files with the time -can occupy lots of space. `ExpirationHandler` allows to expire outdated metadata and -data files after configured time period (`drill.metastore.iceberg.expiration.period`). -If expiration period is not indicated, zero or negative, expiration won't be performed. -`ExpirationHandler` is called after each modification operation, it checks if expiration period -has elapsed and submits expiration process in a separate thread. +can occupy lots of space. Two table properties `write.metadata.delete-after-commit.enabled` +and `write.metadata.previous-versions-max` control expiration process. +Metadata files will be expired automatically if `write.metadata.delete-after-commit.enabled` +is enabled. Snapshots and data files will be expired using `ExpirationHandler` +after each commit operation based on the same table properties values. diff --git a/metastore/iceberg-metastore/pom.xml b/metastore/iceberg-metastore/pom.xml index 818ea08..aa68271 100644 --- a/metastore/iceberg-metastore/pom.xml +++ b/metastore/iceberg-metastore/pom.xml @@ -31,7 +31,7 @@ <name>metastore/Drill Iceberg Metastore</name> <properties> - <iceberg.version>0.7.0-incubating</iceberg.version> + <iceberg.version>2d75130</iceberg.version> <caffeine.version>2.7.0</caffeine.version> </properties> @@ -47,27 +47,27 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>org.apache.iceberg</groupId> + <groupId>com.github.apache.incubator-iceberg</groupId> <artifactId>iceberg-parquet</artifactId> <version>${iceberg.version}</version> </dependency> <dependency> - <groupId>org.apache.iceberg</groupId> + <groupId>com.github.apache.incubator-iceberg</groupId> <artifactId>iceberg-data</artifactId> <version>${iceberg.version}</version> </dependency> <dependency> - <groupId>org.apache.iceberg</groupId> + <groupId>com.github.apache.incubator-iceberg</groupId> <artifactId>iceberg-core</artifactId> <version>${iceberg.version}</version> </dependency> <dependency> - <groupId>org.apache.iceberg</groupId> + <groupId>com.github.apache.incubator-iceberg</groupId> <artifactId>iceberg-common</artifactId> <version>${iceberg.version}</version> </dependency> <dependency> - <groupId>org.apache.iceberg</groupId> + <groupId>com.github.apache.incubator-iceberg</groupId> <artifactId>iceberg-api</artifactId> <version>${iceberg.version}</version> </dependency> diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastore.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastore.java index e223c5a..94bc6bf 100644 --- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastore.java +++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/IcebergMetastore.java @@ -25,7 +25,6 @@ import org.apache.drill.metastore.components.views.Views; import org.apache.drill.metastore.iceberg.components.tables.IcebergTables; import org.apache.drill.metastore.iceberg.config.IcebergConfigConstants; import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException; -import org.apache.drill.metastore.iceberg.operate.ExpirationHandler; import org.apache.drill.metastore.iceberg.schema.IcebergTableSchema; import org.apache.drill.shaded.guava.com.google.common.collect.MapDifference; import org.apache.drill.shaded.guava.com.google.common.collect.Maps; @@ -59,7 +58,6 @@ public class IcebergMetastore implements Metastore { private final org.apache.iceberg.Tables tables; private final String baseLocation; private final Map<String, String> commonProperties; - private final ExpirationHandler expirationHandler; /** * Table properties for each Iceberg table should be updated only once, @@ -77,7 +75,6 @@ public class IcebergMetastore implements Metastore { this.tables = new HadoopTables(new Configuration(configuration)); this.baseLocation = baseLocation(new Configuration(configuration)); this.commonProperties = properties(IcebergConfigConstants.COMPONENTS_COMMON_PROPERTIES); - this.expirationHandler = new ExpirationHandler(config, new Configuration(configuration)); } @Override @@ -85,7 +82,7 @@ public class IcebergMetastore implements Metastore { Table table = loadTable(IcebergConfigConstants.COMPONENTS_TABLES_LOCATION, IcebergConfigConstants.COMPONENTS_TABLES_PROPERTIES, IcebergTables.SCHEMA, Tables.class); - return new IcebergTables(table, expirationHandler); + return new IcebergTables(table); } @Override @@ -273,6 +270,5 @@ public class IcebergMetastore implements Metastore { @Override public void close() { - expirationHandler.close(); } } diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java index f4d32e9..a28248d 100644 --- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java +++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/components/tables/IcebergTables.java @@ -58,9 +58,9 @@ public class IcebergTables implements Tables, MetastoreContext<TableMetadataUnit private final Table table; private final ExpirationHandler expirationHandler; - public IcebergTables(Table table, ExpirationHandler expirationHandler) { + public IcebergTables(Table table) { this.table = table; - this.expirationHandler = expirationHandler; + this.expirationHandler = new ExpirationHandler(table); } public MetastoreContext<TableMetadataUnit> context() { diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/config/IcebergConfigConstants.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/config/IcebergConfigConstants.java index 4419f68..6f326ac 100644 --- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/config/IcebergConfigConstants.java +++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/config/IcebergConfigConstants.java @@ -52,11 +52,6 @@ public interface IcebergConfigConstants { String RELATIVE_PATH = LOCATION_NAMESPACE + "relative_path"; /** - * Defines config which provides expiration period value. - */ - String EXPIRATION_PERIOD = BASE + "expiration.period"; - - /** * Drill Iceberg Metastore components configuration properties namespace. */ String COMPONENTS = BASE + "components."; diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/ExpirationHandler.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/ExpirationHandler.java index f06d496..25cd340 100644 --- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/ExpirationHandler.java +++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/ExpirationHandler.java @@ -17,33 +17,15 @@ */ package org.apache.drill.metastore.iceberg.operate; -import com.typesafe.config.ConfigValue; -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.metastore.iceberg.config.IcebergConfigConstants; -import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.util.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.time.Duration; -import java.time.temporal.ChronoUnit; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /** * Iceberg table generates metadata for each modification operation: @@ -51,226 +33,55 @@ import java.util.regex.Pattern; * previously stored data files are not deleted. These files with the time * can occupy lots of space. * <p/> - * Expiration handler expires outdated metadata and data files after configured expiration period. - * Expiration period is set in the Iceberg Metastore config {@link IcebergConfigConstants#EXPIRATION_PERIOD}. - * Units should correspond to {@link ChronoUnit} values that do not have estimated duration - * (millis, seconds, minutes, hours, days). - * If expiration period is not set, zero or negative, expiration process will not be executed. - * <p/> - * Expiration process is launched using executor service which allows to execute only one thread at a time, - * idle thread is not kept in the core pool since it is assumed that expiration process won't be launched to often. - * <p/> - * During Drillbit shutdown, if there are expiration tasks in the queue, they will be discarded in order to - * unblock Drillbit shutdown process. + * Table metadata in Iceberg is expired automatically + * if {@link TableProperties#METADATA_DELETE_AFTER_COMMIT_ENABLED} set to true. + * Number of metadata files to be retained is configured using {@link TableProperties#METADATA_PREVIOUS_VERSIONS_MAX}. + * Snapshots and data expiration should be called manually to align with metadata expiration process, + * the same table properties are used to determine if expiration is needed and which number + * of snapshots should be retained. */ -public class ExpirationHandler implements AutoCloseable { +public class ExpirationHandler { private static final Logger logger = LoggerFactory.getLogger(ExpirationHandler.class); - private static final Pattern METADATA_VERSION_PATTERN = Pattern.compile("^v([0-9]+)\\..*"); - - // contains Iceberg table location and its last expiration time - private final Map<String, Long> expirationStatus = new ConcurrentHashMap<>(); - private final Configuration configuration; - private final long expirationPeriod; - private volatile ExecutorService executorService; - - public ExpirationHandler(DrillConfig config, Configuration configuration) { - this.configuration = configuration; - this.expirationPeriod = expirationPeriod(config); - logger.debug("Drill Iceberg Metastore expiration period: {}", expirationPeriod); - } - - /** - * Checks if expiration process needs to be performed for the given Iceberg table - * by comparing stored last expiration time. - * If difference between last expiration time and current time is more or equal to - * expiration period, launches expiration process. - * If expiration period is zero or negative, no expiration process will be launched. - * - * @param table Iceberg table instance - * @return true if expiration process was launched, false otherwise - */ - public boolean expire(Table table) { - if (expirationPeriod <= 0) { - return false; - } - - long current = System.currentTimeMillis(); - Long last = expirationStatus.putIfAbsent(table.location(), current); - - if (last != null && current - last >= expirationPeriod) { - expirationStatus.put(table.location(), current); + private final Table table; + private final boolean shouldExpire; + private final int retainNumber; - ExecutorService executorService = executorService(); - executorService.submit(() -> { - logger.debug("Expiring Iceberg table [{}] metadata", table.location()); - table.expireSnapshots() - .expireOlderThan(current) - .commit(); - // TODO: Replace with table metadata expiration through Iceberg API - // when https://github.com/apache/incubator-iceberg/issues/181 is resolved - // table.expireTableMetadata().expireOlderThan(current).commit(); - expireTableMetadata(table); - }); - return true; - } - return false; - } - - public long expirationPeriod() { - return expirationPeriod; - } + public ExpirationHandler(Table table) { + this.table = table; - @Override - public void close() { - if (executorService != null) { - // unlike shutdown(), shutdownNow() discards all queued waiting tasks - // this is done in order to unblock Drillbit shutdown - executorService.shutdownNow(); - } - } - - private long expirationPeriod(DrillConfig config) { - if (config.hasPath(IcebergConfigConstants.EXPIRATION_PERIOD)) { - Duration duration = config.getConfig(IcebergConfigConstants.EXPIRATION_PERIOD).entrySet().stream() - .map(this::duration) - .reduce(Duration.ZERO, Duration::plus); - return duration.toMillis(); - } - return 0; - } - - private Duration duration(Map.Entry<String, ConfigValue> entry) { - String amountText = String.valueOf(entry.getValue().unwrapped()); - String unitText = entry.getKey().toUpperCase(); - try { - long amount = Long.parseLong(amountText); - ChronoUnit unit = ChronoUnit.valueOf(unitText); - return Duration.of(amount, unit); - } catch (NumberFormatException e) { - throw new IcebergMetastoreException(String.format("Error when parsing expiration period config. " + - "Unable to convert [%s] into long", amountText), e); - } catch (IllegalArgumentException e) { - throw new IcebergMetastoreException(String.format("Error when parsing expiration period config. " + - "Unable to convert [%s] into [%s]", unitText, ChronoUnit.class.getCanonicalName()), e); - } + Map<String, String> properties = table.properties(); + this.shouldExpire = PropertyUtil.propertyAsBoolean(properties, + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT); + this.retainNumber = PropertyUtil.propertyAsInt(properties, + TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, + TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT); } /** - * Initializes executor service instance using DCL. - * Created thread executor instance allows to execute only one thread at a time - * but unlike single thread executor does not keep this thread in the pool. - * Custom thread factory is used to define Iceberg Metastore specific thread names. - * - * @return executor service instance + * Expires snapshots and related data if needed + * based on the given table properties values. */ - private ExecutorService executorService() { - if (executorService == null) { - synchronized (this) { - if (executorService == null) { - this.executorService = new ThreadPoolExecutor(0, 1, 0L, - TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new IcebergThreadFactory()); - } - } + public void expire() { + if (shouldExpire) { + table.expireSnapshots() + .expireOlderThan(System.currentTimeMillis()) + .retainLast(retainNumber) + .commit(); } - return executorService; } /** - * Expires outdated Iceberg table metadata files. - * Reads current Iceberg table metadata version from version-hint.text file - * and deletes all metadata files that end with ".metadata.json" and have - * version less than current. - * <p/> - * Should be replaced with - * <code>table.expireTableMetadata().expireOlderThan(current).commit();</code> - * when <a href="https://github.com/apache/incubator-iceberg/issues/181">Issue#181</a> - * is resolved. - * - * @param table Iceberg table instance + * Expires snapshots and related data and ignores possible exceptions. */ - private void expireTableMetadata(Table table) { + public void expireQuietly() { try { - String location = table.location(); - Path metadata = new Path(location, "metadata"); - FileSystem fs = metadata.getFileSystem(configuration); - for (FileStatus fileStatus : listExpiredMetadataFiles(fs, metadata)) { - if (fs.delete(fileStatus.getPath(), false)) { - logger.debug("Deleted Iceberg table [{}] metadata file [{}]", table.location(), fileStatus.getPath()); - } - } - } catch (NumberFormatException | IOException e) { - logger.warn("Unable to expire Iceberg table [{}] metadata files", table.location(), e); - } - } - - /** - * Reads current Iceberg table metadata version from version-hint.text file - * and returns all metadata files that end with ".metadata.json" and have - * version less than current. - * - * @param fs file system - * @param metadata pth to Iceberg metadata - * @return metadata files with version less than current - * @throws IOException in case of error listing file statuses - */ - private FileStatus[] listExpiredMetadataFiles(FileSystem fs, Path metadata) throws IOException { - int currentVersion = currentVersion(fs, metadata); - return fs.listStatus(metadata, path -> { - if (path.getName().endsWith(".metadata.json")) { - int version = parseVersion(path); - return version != -1 && currentVersion > version; - } - return false; - }); - } - - /** - * Reads current table metadata version from version-hint.text file. - * - * @param fs file system - * @param metadata table metadata path - * @return current table metadata version - * @throws IOException if unable to read current table metadata version - */ - private int currentVersion(FileSystem fs, Path metadata) throws IOException { - Path versionHintFile = new Path(metadata, "version-hint.text"); - try (BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(versionHintFile)))) { - return Integer.parseInt(in.readLine().replace("\n", "")); - } - } - - /** - * Extracts metadata version from table metadata file name. - * Example: v1.metadata.json -> 1, v15.metadata.json -> 15 - * - * @param path table metadata file path - * @return metadata version - */ - private int parseVersion(Path path) { - Matcher matcher = METADATA_VERSION_PATTERN.matcher(path.getName()); - if (matcher.find() && matcher.groupCount() == 1) { - return Integer.parseInt(matcher.group(1)); - } - throw new NumberFormatException("Unable to parse version for path " + path); - } - - /** - * Wraps default thread factory and adds Iceberg Metastore prefix to the original thread name. - * Is used to uniquely identify Iceberg metastore threads. - * Example: drill-iceberg-metastore-pool-1-thread-1 - */ - private static class IcebergThreadFactory implements ThreadFactory { - - private static final String THREAD_PREFIX = "drill-iceberg-metastore-"; - private final ThreadFactory delegate = Executors.defaultThreadFactory(); - - @Override - public Thread newThread(Runnable runnable) { - Thread thread = delegate.newThread(runnable); - thread.setName(THREAD_PREFIX + thread.getName()); - return thread; + expire(); + } catch (ValidationException | CommitFailedException e) { + logger.warn("Unable to expire snapshots: {}", e.getMessage()); + logger.debug("Error when expiring snapshots", e); } } } diff --git a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java index a510f86..e29571f 100644 --- a/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java +++ b/metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/IcebergModify.java @@ -80,7 +80,8 @@ public class IcebergModify<T> implements Modify<T> { operations.forEach(op -> op.add(transaction)); transaction.commitTransaction(); - // check if Iceberg table metadata needs to be expired after each modification operation - context.expirationHandler().expire(context.table()); + // expiration process should not intervene with data modification operations + // if expiration fails, will attempt to expire the next time + context.expirationHandler().expireQuietly(); } } diff --git a/metastore/iceberg-metastore/src/main/resources/drill-metastore-module.conf b/metastore/iceberg-metastore/src/main/resources/drill-metastore-module.conf index 33fe795..de1f1f9 100644 --- a/metastore/iceberg-metastore/src/main/resources/drill-metastore-module.conf +++ b/metastore/iceberg-metastore/src/main/resources/drill-metastore-module.conf @@ -34,18 +34,14 @@ drill.metastore.iceberg: { relative_path: ${drill.exec.zk.root}"/metastore/iceberg" } - // Specifies time period after which outdated Iceberg table metadata will be expired, - // unit names must correspond to java.time.temporal.ChronoUnit enum values - // that do not have estimated duration (millis, seconds, minutes, hours, days). - // Example: hours: 10, minutes: 20 - expiration.period: { - days: 5 - } - components: { // Common properties for all Iceberg tables from org.apache.iceberg.TableProperties can be specified common.properties: { - write.metadata.compression-codec: "none" + write.metadata.compression-codec: "none", + // Enables metadata expiration to avoid consuming space with historical data + // In Drill the same table properties apply to the snapshots expiration process as well + write.metadata.delete-after-commit.enabled: true + write.metadata.previous-versions-max: 2, }, tables: { diff --git a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/operate/TestExpirationHandler.java b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/operate/TestExpirationHandler.java index 90ef55b..d330b3b 100644 --- a/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/operate/TestExpirationHandler.java +++ b/metastore/iceberg-metastore/src/test/java/org/apache/drill/metastore/iceberg/operate/TestExpirationHandler.java @@ -17,127 +17,98 @@ */ package org.apache.drill.metastore.iceberg.operate; +import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.metastore.components.tables.TableMetadataUnit; +import org.apache.drill.metastore.components.tables.Tables; import org.apache.drill.metastore.iceberg.IcebergBaseTest; +import org.apache.drill.metastore.iceberg.IcebergMetastore; +import org.apache.drill.metastore.iceberg.components.tables.IcebergTables; import org.apache.drill.metastore.iceberg.config.IcebergConfigConstants; -import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException; -import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.junit.Test; -import java.util.concurrent.TimeUnit; +import java.io.File; +import java.util.stream.IntStream; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class TestExpirationHandler extends IcebergBaseTest { @Test - public void testConfigEmpty() { - ExpirationHandler expirationHandler = new ExpirationHandler(DrillConfig.create(), baseHadoopConfig()); - assertEquals(0, expirationHandler.expirationPeriod()); - } + public void testNoExpiration() { + IcebergTables tables = tables("no-expiration", false, 2); - @Test - public void testConfigOneUnit() { - DrillConfig config = new DrillConfig(DrillConfig.create() - .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hours", - ConfigValueFactory.fromAnyRef(5))); - ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig()); - assertEquals(TimeUnit.HOURS.toMillis(5), expirationHandler.expirationPeriod()); - } + // check that there is no history + assertEquals(0, tables.table().history().size()); - @Test - public void testConfigSeveralUnits() { - DrillConfig config = new DrillConfig(DrillConfig.create() - .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hours", - ConfigValueFactory.fromAnyRef(5)) - .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".minutes", - ConfigValueFactory.fromAnyRef(10))); - ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig()); - assertEquals(TimeUnit.HOURS.toMillis(5) + TimeUnit.MINUTES.toMillis(10), - expirationHandler.expirationPeriod()); - } + int operationsNumber = 5; + execute(tables, operationsNumber); - @Test - public void testConfigNegativeValue() { - DrillConfig config = new DrillConfig(DrillConfig.create() - .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hours", - ConfigValueFactory.fromAnyRef(-5))); - ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig()); - assertEquals(TimeUnit.HOURS.toMillis(-5), expirationHandler.expirationPeriod()); + // check that the number of executed operations is same as number of history records + assertEquals(operationsNumber, tables.table().history().size()); } @Test - public void testConfigIncorrectUnit() { - DrillConfig config = new DrillConfig(DrillConfig.create() - .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hour", - ConfigValueFactory.fromAnyRef(5))); + public void testExpiration() { + int retainNumber = 3; + IcebergTables tables = tables("expiration", true, retainNumber); - thrown.expect(IcebergMetastoreException.class); - new ExpirationHandler(config, baseHadoopConfig()); - } + // check that there is no history + assertEquals(0, tables.table().history().size()); - @Test - public void testConfigIncorrectValue() { - DrillConfig config = new DrillConfig(DrillConfig.create() - .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".hours", - ConfigValueFactory.fromAnyRef("abc"))); + execute(tables, 5); - thrown.expect(IcebergMetastoreException.class); - new ExpirationHandler(config, baseHadoopConfig()); + // check that number of history records corresponds to the expected retain number + assertEquals(retainNumber, tables.table().history().size()); } @Test - public void testExpireZeroExpirationPeriod() { - DrillConfig config = new DrillConfig(DrillConfig.create() - .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".millis", - ConfigValueFactory.fromAnyRef(0))); - - ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig()); - Table table = mock(Table.class); - assertFalse(expirationHandler.expire(table)); - } + public void testSubsequentExpiration() { + String name = "subsequent-expiration"; + int retainNumber = 2; + int operationsNumber = 5; - @Test - public void testExpireNegativeExpirationPeriod() { - DrillConfig config = new DrillConfig(DrillConfig.create() - .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".millis", - ConfigValueFactory.fromAnyRef(-10))); - - ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig()); - Table table = mock(Table.class); - assertFalse(expirationHandler.expire(table)); - } + IcebergTables initialTables = tables(name, false, retainNumber); - @Test - public void testExpireFirstTime() { - DrillConfig config = new DrillConfig(DrillConfig.create() - .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".millis", - ConfigValueFactory.fromAnyRef(1))); + execute(initialTables, operationsNumber); - ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig()); + // check that number of executed operations is the same as number of history records + assertEquals(operationsNumber, initialTables.table().history().size()); - Table table = mock(Table.class); - when(table.location()).thenReturn("/tmp/table"); + // update table configuration, allow expiration + IcebergTables updatedTables = tables(name, true, retainNumber); - assertFalse(expirationHandler.expire(table)); - } + // check that number of history operation did not change + assertEquals(operationsNumber, updatedTables.table().history().size()); - @Test - public void testExpireBefore() { - DrillConfig config = new DrillConfig(DrillConfig.create() - .withValue(IcebergConfigConstants.EXPIRATION_PERIOD + ".days", - ConfigValueFactory.fromAnyRef(1))); + execute(updatedTables, operationsNumber); - ExpirationHandler expirationHandler = new ExpirationHandler(config, baseHadoopConfig()); + // check that number of history records corresponds to the expected retain number + assertEquals(retainNumber, updatedTables.table().history().size()); + } - Table table = mock(Table.class); - when(table.location()).thenReturn("/tmp/table"); + private IcebergTables tables(String name, boolean shouldExpire, int retainNumber) { + Config config = baseIcebergConfig(new File(defaultFolder.getRoot(), name)) + .withValue(IcebergConfigConstants.COMPONENTS_COMMON_PROPERTIES + "." + TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, + ConfigValueFactory.fromAnyRef(retainNumber)) + .withValue(IcebergConfigConstants.COMPONENTS_COMMON_PROPERTIES + "." + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, + ConfigValueFactory.fromAnyRef(shouldExpire)); + DrillConfig drillConfig = new DrillConfig(config); + return (IcebergTables) new IcebergMetastore(drillConfig).tables(); + } - assertFalse(expirationHandler.expire(table)); - assertFalse(expirationHandler.expire(table)); + private void execute(Tables tables, int operationsNumber) { + IntStream.range(0, operationsNumber) + .mapToObj(i -> TableMetadataUnit.builder() + .storagePlugin("dfs") + .workspace("tmp") + .tableName("nation") + .metadataKey("dir" + i) + .build()) + .forEach(table -> tables.modify() + .overwrite(table) + .execute()); } }