[ https://issues.apache.org/jira/browse/DRILL-7331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16897242#comment-16897242 ]
ASF GitHub Bot commented on DRILL-7331: --------------------------------------- arina-ielchiieva commented on pull request #1831: DRILL-7331: Drill Iceberg Metastore metadata expiration URL: https://github.com/apache/drill/pull/1831#discussion_r309234038 ########## File path: metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/ExpirationHandler.java ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.metastore.iceberg.operate; + +import com.typesafe.config.ConfigValue; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.metastore.iceberg.config.IcebergConfigConstants; +import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Iceberg table generates metadata for each modification operation: + * snapshot, manifest file, table metadata file. Also when performing delete operation, + * previously stored data files are not deleted. These files with the time + * can occupy lots of space. + * <p/> + * Expiration handler expires outdated metadata and data files after configured expiration period. + * Expiration period is set in the Iceberg Metastore config {@link IcebergConfigConstants#EXPIRATION_PERIOD}. + * Units should correspond to {@link ChronoUnit} values that do not have estimated duration + * (millis, seconds, minutes, hours, days). + * If expiration period is not set, zero or negative, expiration process will not be executed. + * <p/> + * Expiration process is launched using executor service which allows to execute only one thread at a time, + * idle thread is not kept in the core pool since it is assumed the expiration process won't be launched to often. + * <p/> + * During Drillbit shutdown, if there are expiration tasks in the queue, they will be discarded in order to + * unblock Drillbit shutdown process. + */ +public class ExpirationHandler implements AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(ExpirationHandler.class); + + private static final Pattern METADATA_VERSION_PATTERN = Pattern.compile("^v([0-9]+)\\..*"); + + // contains Iceberg table location and its last expiration time + private final Map<String, Long> expirationStatus = new ConcurrentHashMap<>(); + private final Configuration configuration; + private final long expirationPeriod; + private volatile ExecutorService executorService; + + public ExpirationHandler(DrillConfig config, Configuration configuration) { + this.configuration = configuration; + this.expirationPeriod = expirationPeriod(config); + logger.debug("Drill Iceberg Metastore expiration period: {}", expirationPeriod); + } + + /** + * Checks if expiration process needs to be performed for the given Iceberg table + * by comparing stored last expiration time. + * If difference between last expiration time and current time is more or equal to + * provided expiration period in the Iceberg Metastore config, launches expiration process. + * If expiration period is zero or negative, no expiration process will be launched. + * + * @param table Iceberg table instance + * @return true if expiration process was launched, false otherwise + */ + public boolean expire(Table table) { + if (expirationPeriod <= 0) { + return false; + } + + long current = System.currentTimeMillis(); + Long last = expirationStatus.putIfAbsent(table.location(), current); + + if (last != null && current - last >= expirationPeriod) { + expirationStatus.put(table.location(), current); + + ExecutorService executorService = executorService(); + executorService.submit(() -> { + logger.debug("Expiring Iceberg table [{}] metadata", table.location()); + table.expireSnapshots() + .expireOlderThan(current) + .commit(); + //todo Add table metadata expiration through Iceberg API + // when https://github.com/apache/incubator-iceberg/issues/181 is resolved + // table.expireTableMetadata().expireOlderThan(current).commit(); + // todo remove when https://github.com/apache/incubator-iceberg/issues/181 is resolved + expireTableMetadata(table); + }); + return true; + } + return false; + } + + public long expirationPeriod() { + return expirationPeriod; + } + + @Override + public void close() { + if (executorService != null) { + // unlike shutdown(), shutdownNow() discards all queued waiting tasks + // this is done in order to unblock Drillbit shutdown + executorService.shutdownNow(); + } + } + + private long expirationPeriod(DrillConfig config) { + if (config.hasPath(IcebergConfigConstants.EXPIRATION_PERIOD)) { + Duration duration = config.getConfig(IcebergConfigConstants.EXPIRATION_PERIOD).entrySet().stream() + .map(this::duration) + .reduce(Duration.ZERO, Duration::plus); + return duration.toMillis(); + } + return 0; + } + + private Duration duration(Map.Entry<String, ConfigValue> entry) { + try { + return Duration.of(Long.parseLong(String.valueOf(entry.getValue().unwrapped())), Review comment: Split into several variables. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support Iceberg metadata expiration > ----------------------------------- > > Key: DRILL-7331 > URL: https://issues.apache.org/jira/browse/DRILL-7331 > Project: Apache Drill > Issue Type: Sub-task > Reporter: Arina Ielchiieva > Assignee: Arina Ielchiieva > Priority: Major > Labels: doc-impacting > Fix For: 1.17.0 > > > Currently Iceberg produces snapshots and table metadata for each modification > operation. > Plus it does not retire outdated data files. All these information is stored > and in metadata folder and with time occupy lots of space. This Jira aims to > expire outdated snapshots, table metadata and data files based on configured > expiration period. > Iceberg has already mechanism to retire snapshots and data files. Mechanism > to expire table metadata needs to be implemented > (https://github.com/apache/incubator-iceberg/issues/181). -- This message was sent by Atlassian JIRA (v7.6.14#76016)