[ 
https://issues.apache.org/jira/browse/DRILL-7331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16897243#comment-16897243
 ] 

ASF GitHub Bot commented on DRILL-7331:
---------------------------------------

arina-ielchiieva commented on pull request #1831: DRILL-7331: Drill Iceberg 
Metastore metadata expiration
URL: https://github.com/apache/drill/pull/1831#discussion_r309234580
 
 

 ##########
 File path: 
metastore/iceberg-metastore/src/main/java/org/apache/drill/metastore/iceberg/operate/ExpirationHandler.java
 ##########
 @@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.metastore.iceberg.operate;
+
+import com.typesafe.config.ConfigValue;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.metastore.iceberg.config.IcebergConfigConstants;
+import org.apache.drill.metastore.iceberg.exceptions.IcebergMetastoreException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Iceberg table generates metadata for each modification operation:
+ * snapshot, manifest file, table metadata file. Also when performing delete 
operation,
+ * previously stored data files are not deleted. These files with the time
+ * can occupy lots of space.
+ * <p/>
+ * Expiration handler expires outdated metadata and data files after 
configured expiration period.
+ * Expiration period is set in the Iceberg Metastore config {@link 
IcebergConfigConstants#EXPIRATION_PERIOD}.
+ * Units should correspond to {@link ChronoUnit} values that do not have 
estimated duration
+ * (millis, seconds, minutes, hours, days).
+ * If expiration period is not set, zero or negative, expiration process will 
not be executed.
+ * <p/>
+ * Expiration process is launched using executor service which allows to 
execute only one thread at a time,
+ * idle thread is not kept in the core pool since it is assumed the expiration 
process won't be launched to often.
+ * <p/>
+ * During Drillbit shutdown, if there are expiration tasks in the queue, they 
will be discarded in order to
+ * unblock Drillbit shutdown process.
+ */
+public class ExpirationHandler implements AutoCloseable {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ExpirationHandler.class);
+
+  private static final Pattern METADATA_VERSION_PATTERN = 
Pattern.compile("^v([0-9]+)\\..*");
+
+  // contains Iceberg table location and its last expiration time
+  private final Map<String, Long> expirationStatus = new ConcurrentHashMap<>();
+  private final Configuration configuration;
+  private final long expirationPeriod;
+  private volatile ExecutorService executorService;
+
+  public ExpirationHandler(DrillConfig config, Configuration configuration) {
+    this.configuration = configuration;
+    this.expirationPeriod = expirationPeriod(config);
+    logger.debug("Drill Iceberg Metastore expiration period: {}", 
expirationPeriod);
+  }
+
+  /**
+   * Checks if expiration process needs to be performed for the given Iceberg 
table
+   * by comparing stored last expiration time.
+   * If difference between last expiration time and current time is more or 
equal to
+   * provided expiration period in the Iceberg Metastore config, launches 
expiration process.
+   * If expiration period is zero or negative, no expiration process will be 
launched.
+   *
+   * @param table Iceberg table instance
+   * @return true if expiration process was launched, false otherwise
+   */
+  public boolean expire(Table table) {
+    if (expirationPeriod <= 0) {
+      return false;
+    }
+
+    long current = System.currentTimeMillis();
+    Long last = expirationStatus.putIfAbsent(table.location(), current);
+
+    if (last != null && current - last >= expirationPeriod) {
+      expirationStatus.put(table.location(), current);
+
+      ExecutorService executorService = executorService();
+      executorService.submit(() -> {
+        logger.debug("Expiring Iceberg table [{}] metadata", table.location());
+        table.expireSnapshots()
+          .expireOlderThan(current)
+          .commit();
+        //todo Add table metadata expiration through Iceberg API
+        // when https://github.com/apache/incubator-iceberg/issues/181 is 
resolved
+        // table.expireTableMetadata().expireOlderThan(current).commit();
+        // todo remove when 
https://github.com/apache/incubator-iceberg/issues/181 is resolved
+        expireTableMetadata(table);
+      });
+      return true;
+    }
+    return false;
+  }
+
+  public long expirationPeriod() {
+    return expirationPeriod;
+  }
+
+  @Override
+  public void close() {
+    if (executorService != null) {
+      // unlike shutdown(), shutdownNow() discards all queued waiting tasks
+      // this is done in order to unblock Drillbit shutdown
+      executorService.shutdownNow();
+    }
+  }
+
+  private long expirationPeriod(DrillConfig config) {
+    if (config.hasPath(IcebergConfigConstants.EXPIRATION_PERIOD)) {
+      Duration duration = 
config.getConfig(IcebergConfigConstants.EXPIRATION_PERIOD).entrySet().stream()
+        .map(this::duration)
+        .reduce(Duration.ZERO, Duration::plus);
+      return duration.toMillis();
+    }
+    return 0;
+  }
+
+  private Duration duration(Map.Entry<String, ConfigValue> entry) {
+    try {
+      return 
Duration.of(Long.parseLong(String.valueOf(entry.getValue().unwrapped())),
+        ChronoUnit.valueOf(entry.getKey().toUpperCase()));
+    } catch (NumberFormatException e) {
+      throw new IcebergMetastoreException(String.format("Error when parsing 
expiration period config. " +
+        "Unable to convert [%s] into long", entry.getValue().unwrapped()), e);
+    } catch (IllegalArgumentException e) {
+      throw new IcebergMetastoreException(String.format("Error when parsing 
expiration period config. " +
+        "Unable to convert [%s] into [%s]", entry.getKey(), 
ChronoUnit.class.getCanonicalName()), e);
+    }
+  }
+
+  /**
+   * Initializes executor service instance using DCL.
+   * Created thread executor instance allows to execute only one thread at a 
time
+   * but unlike single thread executor does not keep this thread in the pool.
+   * Custom thread factory is used to define Iceberg Metastore specific thread 
names.
+   *
+   * @return executor service instance
+   */
+  private ExecutorService executorService() {
+    if (executorService == null) {
+      synchronized (this) {
+        if (executorService == null) {
+          this.executorService = new ThreadPoolExecutor(0, 1, 0L,
+            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new 
IcebergThreadFactory());
+        }
+      }
+    }
+    return executorService;
+  }
+
+  /**
+   * Expires outdated Iceberg table metadata files.
+   * Reads current Iceberg table metadata version from version-hint.text file
+   * and deletes all metadata files that end with ".metadata.json" and have
+   * version less than current.
+   * <p/>
+   * Should be replaced with
+   * 
<code>table.expireTableMetadata().expireOlderThan(current).commit();</code>
+   * when <a 
href="https://github.com/apache/incubator-iceberg/issues/181";>Issue#181</a>
+   * is resolved.
+   *
+   * @param table Iceberg table instance
+   */
+  private void expireTableMetadata(Table table) {
+    try {
+      String location = table.location();
+      Path metadata = new Path(location, "metadata");
+      FileSystem fs = metadata.getFileSystem(configuration);
+      int currentVersion = currentVersion(fs, metadata);
+
+      for (FileStatus fileStatus : fs.listStatus(metadata, path -> {
 
 Review comment:
   Done.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support Iceberg metadata expiration
> -----------------------------------
>
>                 Key: DRILL-7331
>                 URL: https://issues.apache.org/jira/browse/DRILL-7331
>             Project: Apache Drill
>          Issue Type: Sub-task
>            Reporter: Arina Ielchiieva
>            Assignee: Arina Ielchiieva
>            Priority: Major
>              Labels: doc-impacting
>             Fix For: 1.17.0
>
>
> Currently Iceberg produces snapshots and table metadata for each modification 
> operation.
> Plus it does not retire outdated data files. All these information is stored 
> and in metadata folder and with time occupy lots of space. This Jira aims to 
> expire outdated snapshots, table metadata and data files based on configured 
> expiration period.
> Iceberg has already mechanism to retire snapshots and data files. Mechanism 
> to expire table metadata needs to be implemented 
> (https://github.com/apache/incubator-iceberg/issues/181).



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to