HIVE-20025: Clean-up of event files created by HiveProtoLoggingHook (Sankar Hariappan, reviewed by Harish Jaiprakash, Anishek Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6311e0b0 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6311e0b0 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6311e0b0 Branch: refs/heads/master-txnstats Commit: 6311e0b031b5937d39bde8aad9916c8d0911f0b3 Parents: ee8c72a Author: Sankar Hariappan <sank...@apache.org> Authored: Wed Jul 4 13:08:00 2018 +0530 Committer: Sankar Hariappan <sank...@apache.org> Committed: Wed Jul 4 13:08:00 2018 +0530 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 11 ++ .../TestHiveProtoEventsCleanerTask.java | 141 ++++++++++++++++ .../metastore/HiveProtoEventsCleanerTask.java | 168 +++++++++++++++++++ .../hive/ql/hooks/HiveProtoLoggingHook.java | 15 +- .../hive/ql/hooks/TestHiveProtoLoggingHook.java | 2 +- .../hive/metastore/conf/MetastoreConf.java | 3 +- 6 files changed, 331 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7ef22d6..2da9086 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -625,6 +625,17 @@ public class HiveConf extends Configuration { "Table alias will be added to column names for queries of type \"select *\" or \n" + "if query explicitly uses table alias \"select r1.x..\"."), + HIVE_PROTO_EVENTS_BASE_PATH("hive.hook.proto.base-directory", "", + "Base directory into which the proto event messages are written by HiveProtoLoggingHook."), + HIVE_PROTO_EVENTS_QUEUE_CAPACITY("hive.hook.proto.queue.capacity", 64, + "Queue capacity for the proto events logging threads."), + HIVE_PROTO_EVENTS_CLEAN_FREQ("hive.hook.proto.events.clean.freq", "1d", + new TimeValidator(TimeUnit.DAYS), + "Frequency at which timer task runs to purge expired proto event files."), + HIVE_PROTO_EVENTS_TTL("hive.hook.proto.events.ttl", "7d", + new TimeValidator(TimeUnit.DAYS), + "Time-To-Live (TTL) of proto event files before cleanup."), + // Hadoop Configuration Properties // Properties with null values are ignored and exist only for the purpose of giving us // a symbolic name to reference in the Hive source code. Properties with non-null http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveProtoEventsCleanerTask.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveProtoEventsCleanerTask.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveProtoEventsCleanerTask.java new file mode 100644 index 0000000..e187fad --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveProtoEventsCleanerTask.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.util.SystemClock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestHiveProtoEventsCleanerTask { + protected static final Logger LOG = LoggerFactory.getLogger(TestHiveProtoEventsCleanerTask.class); + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + private Path baseDir; + private HiveConf hiveConf; + private SystemClock clock = SystemClock.getInstance(); + private HiveProtoEventsCleanerTask cleanerTask; + private FileSystem fs; + + private final String[] eventsSubDirs = new String[] { "query_data", "dag_meta", "dag_data", "app_data" }; + + @Before + public void setup() throws Exception { + hiveConf = new HiveConf(TestHiveProtoEventsCleanerTask.class); + String tmpFolder = folder.newFolder().getAbsolutePath(); + hiveConf.setVar(HiveConf.ConfVars.HIVE_PROTO_EVENTS_BASE_PATH, tmpFolder + "/" + eventsSubDirs[0]); + HiveConf.setTimeVar(hiveConf, ConfVars.HIVE_PROTO_EVENTS_TTL, 2, TimeUnit.DAYS); + + baseDir = new Path(tmpFolder); + fs = baseDir.getFileSystem(hiveConf); + cleanerTask = JavaUtils.newInstance(HiveProtoEventsCleanerTask.class); + cleanerTask.setConf(hiveConf); + } + + /** + * Returns the current date, using the underlying clock in UTC time. + */ + private LocalDate getNow() { + // Use UTC date to ensure reader date is same on all timezones. + return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC).toLocalDate(); + } + + /** + * Returns the directory name for a given date. + */ + public String getDirForDate(LocalDate date) { + return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date); + } + + private void addDatePartition(Path basePath, LocalDate date) throws Exception { + if (!fs.exists(basePath)) { + fs.mkdirs(basePath); + fs.setPermission(basePath, FsPermission.createImmutable((short)01777)); + } + + Path datePtn = new Path(basePath, getDirForDate(date)); + fs.mkdirs(datePtn); + fs.setPermission(datePtn, FsPermission.createImmutable((short) 01777)); + FsPermission.setUMask(hiveConf, FsPermission.createImmutable((short) 0066)); + Path partFile = new Path(datePtn, "data"); + FSDataOutputStream out = fs.create(partFile); + out.writeInt(1000); + out.close(); + } + + @Test + public void testCleanup() throws Exception { + int[] inRange = { 3, 5, 2, 1 }; // Must have one entry per eventsSubDirs + int[] outRange = { 2, 2, 2, 1 }; // Must have one entry per eventsSubDirs + LocalDate today = getNow(); + + // Add partitions for the given range of dates from today to past. + for (int i = 0; i < inRange.length; i++) { + Path basePath = new Path(baseDir + "/" + eventsSubDirs[i]); + for (int j = 0; j < inRange[i]; j++) { + addDatePartition(basePath, today.minusDays(j)); + } + } + + // Run the task to cleanup + cleanerTask.run(); + + // Verify if the remaining partitions are not expired ones. + String expiredPtn = getDirForDate(today.minusDays(2)); + for (int i = 0; i < inRange.length; i++) { + Path basePath = new Path(baseDir + "/" + eventsSubDirs[i]); + FileStatus[] statuses = fs.listStatus(basePath); + + // If the test setup created today and if test runs tomorrow, then extra dir will be deleted. + // So, checking for both cases. + assertTrue((statuses.length == outRange[i]) || (statuses.length == (outRange[i] - 1))); + for (FileStatus status : statuses) { + assertTrue(status.getPath().getName().compareTo(expiredPtn) >= 0); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java new file mode 100644 index 0000000..2a772e2 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.util.SystemClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class HiveProtoEventsCleanerTask implements MetastoreTaskThread { + public static final Logger LOG = LoggerFactory.getLogger(HiveProtoEventsCleanerTask.class); + + private final String[] eventsSubDirs = new String[] { "query_data", "dag_meta", "dag_data", "app_data" }; + private List<Path> eventsBasePaths = new ArrayList<>(); + private Configuration conf; + private long ttl; + private static String expiredDatePtn = null; + private static final SystemClock clock = SystemClock.getInstance(); + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + + String hiveEventsDir = HiveConf.getVar(conf, ConfVars.HIVE_PROTO_EVENTS_BASE_PATH); + if (StringUtils.isBlank(hiveEventsDir)) { + return; + } + Path hiveEventsBasePath = new Path(hiveEventsDir); + Path baseDir = hiveEventsBasePath.getParent(); + for (String subDir : eventsSubDirs) { + eventsBasePaths.add(new Path(baseDir, subDir)); + } + assert(eventsBasePaths.get(0).equals(hiveEventsBasePath)); + ttl = HiveConf.getTimeVar(conf, ConfVars.HIVE_PROTO_EVENTS_TTL, TimeUnit.MILLISECONDS); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public long runFrequency(TimeUnit unit) { + return HiveConf.getTimeVar(conf, ConfVars.HIVE_PROTO_EVENTS_CLEAN_FREQ, unit); + } + + @Override + public void run() { + // If Hive proto logging is not enabled, then nothing to be cleaned-up. + if (eventsBasePaths.isEmpty()) { + return; + } + + // Expired date should be computed each time we run cleaner thread. + computeExpiredDatePtn(ttl); + for (Path basePath : eventsBasePaths) { + cleanupDir(basePath); + } + } + + /** + * Compute the expired date partition, using the underlying clock in UTC time. + */ + private static void computeExpiredDatePtn(long ttl) { + // Use UTC date to ensure reader date is same on all timezones. + LocalDate expiredDate + = LocalDateTime.ofEpochSecond((clock.getTime() - ttl) / 1000, 0, ZoneOffset.UTC).toLocalDate(); + expiredDatePtn = "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(expiredDate); + } + + /** + * Path filters to include only expired date partitions based on TTL. + */ + private static final PathFilter expiredDatePartitionsFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + String dirName = path.getName(); + return ((dirName.startsWith("date=")) + && (dirName.compareTo(expiredDatePtn) <= 0)); + } + }; + + /** + * Finds the expired date partitioned events directory based on TTL and delete them. + */ + private void cleanupDir(Path eventsBasePath) { + LOG.debug("Trying to delete expired proto events from " + eventsBasePath); + try { + FileSystem fs = FileSystem.get(eventsBasePath.toUri(), conf); + if (!fs.exists(eventsBasePath)) { + return; + } + FileStatus[] statuses = fs.listStatus(eventsBasePath, expiredDatePartitionsFilter); + for (FileStatus dir : statuses) { + try { + deleteDirByOwner(fs, dir); + LOG.info("Deleted expired proto events dir: " + dir.getPath()); + } catch (IOException ioe) { + // Log error and continue to delete other expired dirs. + LOG.error("Error deleting expired proto events dir " + dir.getPath(), ioe); + } + } + } catch (IOException e) { + LOG.error("Error while trying to delete expired proto events from " + eventsBasePath, e); + } + } + + /** + * Delete the events dir with it's owner as proxy user. + */ + private void deleteDirByOwner(FileSystem fs, FileStatus eventsDir) throws IOException { + String owner = eventsDir.getOwner(); + if (owner.equals(System.getProperty("user.name"))) { + fs.delete(eventsDir.getPath(), true); + } else { + LOG.info("Deleting " + eventsDir.getPath() + " as user " + owner); + UserGroupInformation ugi = UserGroupInformation.createProxyUser(owner, + UserGroupInformation.getLoginUser()); + try { + ugi.doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws Exception { + // New FileSystem object to be obtained in user context for doAs flow. + try (FileSystem doAsFs = FileSystem.newInstance(eventsDir.getPath().toUri(), conf)) { + doAsFs.delete(eventsDir.getPath(), true); + } + return null; + } + }); + } catch (InterruptedException ie) { + LOG.error("Could not delete " + eventsDir.getPath() + " for UGI: " + ugi, ie); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java index 0820bea..f463437 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java @@ -102,8 +102,10 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.compress.utils.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.ExplainTask; @@ -158,9 +160,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { .collect(Collectors.toSet()); } - public static final String HIVE_EVENTS_BASE_PATH = "hive.hook.proto.base-directory"; - public static final String HIVE_HOOK_PROTO_QUEUE_CAPACITY = "hive.hook.proto.queue.capacity"; - public static final int HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT = 64; + private static final int HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT = 64; private static final int WAIT_TIME = 5; public enum EventType { @@ -190,9 +190,10 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { this.clock = clock; // randomUUID is slow, since its cryptographically secure, only first query will take time. this.logFileName = "hive_" + UUID.randomUUID().toString(); - String baseDir = conf.get(HIVE_EVENTS_BASE_PATH); - if (baseDir == null) { - LOG.error(HIVE_EVENTS_BASE_PATH + " is not set, logging disabled."); + String baseDir = conf.getVar(ConfVars.HIVE_PROTO_EVENTS_BASE_PATH); + if (StringUtils.isBlank(baseDir)) { + baseDir = null; + LOG.error(ConfVars.HIVE_PROTO_EVENTS_BASE_PATH.varname + " is not set, logging disabled."); } DatePartitionedLogger<HiveHookEventProto> tmpLogger = null; @@ -211,7 +212,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { return; } - int queueCapacity = conf.getInt(HIVE_HOOK_PROTO_QUEUE_CAPACITY, + int queueCapacity = conf.getInt(ConfVars.HIVE_PROTO_EVENTS_QUEUE_CAPACITY.varname, HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT); ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java index 96fb73c..8124528 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java @@ -64,7 +64,7 @@ public class TestHiveProtoLoggingHook { public void setup() throws Exception { conf = new HiveConf(); tmpFolder = folder.newFolder().getAbsolutePath(); - conf.set(HiveProtoLoggingHook.HIVE_EVENTS_BASE_PATH, tmpFolder); + conf.setVar(HiveConf.ConfVars.HIVE_PROTO_EVENTS_BASE_PATH, tmpFolder); QueryState state = new QueryState.Builder().withHiveConf(conf).build(); @SuppressWarnings("serial") QueryPlan queryPlan = new QueryPlan(HiveOperation.QUERY) {}; http://git-wip-us.apache.org/repos/asf/hive/blob/6311e0b0/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 19da432..74a301f 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -763,7 +763,8 @@ public class MetastoreConf { EventCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName() + "," + "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," + MaterializationsCacheCleanerTask.class.getName() + "," + - MaterializationsRebuildLockCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName(), + MaterializationsRebuildLockCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName() + "," + + "org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask", "Comma separated list of tasks that will be started in separate threads. These will " + "always be started, regardless of whether the metastore is running in embedded mode " + "or in server mode. They must implement " + MetastoreTaskThread.class.getName()),