http://git-wip-us.apache.org/repos/asf/hive/blob/e08cc6e6/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java new file mode 100644 index 0000000..c9d1b93 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.hooks; + +import java.io.IOException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.yarn.util.Clock; + +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; + +/** + * Class to create proto reader and writer for a date partitioned directory structure. + * + * @param <T> The proto message type. + */ +public class DatePartitionedLogger<T extends MessageLite> { + // Everyone has permission to write, but with sticky set so that delete is restricted. + // This is required, since the path is same for all users and everyone writes into it. + private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777); + + private final Parser<T> parser; + private final Path basePath; + private final Configuration conf; + private final Clock clock; + private final FileSystem fileSystem; + + public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration conf, Clock clock) + throws IOException { + this.conf = conf; + this.clock = clock; + this.parser = parser; + this.fileSystem = baseDir.getFileSystem(conf); + if (!fileSystem.exists(baseDir)) { + fileSystem.mkdirs(baseDir); + fileSystem.setPermission(baseDir, DIR_PERMISSION); + } + this.basePath = fileSystem.resolvePath(baseDir); + } + + /** + * Creates a writer for the given fileName, with date as today. + */ + public ProtoMessageWriter<T> getWriter(String fileName) throws IOException { + Path filePath = getPathForDate(getNow().toLocalDate(), fileName); + return new ProtoMessageWriter<>(conf, filePath, parser); + } + + /** + * Creates a reader for the given filePath, no validation is done. + */ + public ProtoMessageReader<T> getReader(Path filePath) throws IOException { + return new ProtoMessageReader<>(conf, filePath, parser); + } + + /** + * Create a path for the given date and fileName. This can be used to create a reader. + */ + public Path getPathForDate(LocalDate date, String fileName) throws IOException { + Path path = new Path(basePath, getDirForDate(date)); + if (!fileSystem.exists(path)) { + fileSystem.mkdirs(path); + fileSystem.setPermission(path, DIR_PERMISSION); + } + return new Path(path, fileName); + } + + /** + * Extract the date from the directory name, this should be a directory created by this class. + */ + public LocalDate getDateFromDir(String dirName) { + if (!dirName.startsWith("date=")) { + throw new IllegalArgumentException("Invalid directory: "+ dirName); + } + return LocalDate.parse(dirName.substring(5), DateTimeFormatter.ISO_LOCAL_DATE); + } + + /** + * Returns the directory name for a given date. + */ + public String getDirForDate(LocalDate date) { + return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date); + } + + /** + * Find next available directory, after the given directory. + */ + public String getNextDirectory(String currentDir) throws IOException { + // Fast check, if the next day directory exists return it. + String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1)); + if (fileSystem.exists(new Path(basePath, nextDate))) { + return nextDate; + } + // Have to scan the directory to find min date greater than currentDir. + String dirName = null; + for (FileStatus status : fileSystem.listStatus(basePath)) { + String name = status.getPath().getName(); + // String comparison is good enough, since its of form date=yyyy-MM-dd + if (name.compareTo(currentDir) > 0 && (dirName == null || name.compareTo(dirName) < 0)) { + dirName = name; + } + } + return dirName; + } + + /** + * Returns new or changed files in the given directory. The offsets are used to find + * changed files. + */ + public List<Path> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets) + throws IOException { + Path dirPath = new Path(basePath, subDir); + List<Path> newFiles = new ArrayList<>(); + if (!fileSystem.exists(dirPath)) { + return newFiles; + } + for (FileStatus status : fileSystem.listStatus(dirPath)) { + String fileName = status.getPath().getName(); + Long offset = currentOffsets.get(fileName); + // If the offset was never added or offset < fileSize. + if (offset == null || offset < status.getLen()) { + newFiles.add(new Path(dirPath, fileName)); + } + } + return newFiles; + } + + /** + * Returns the current time, using the underlying clock in UTC time. + */ + public LocalDateTime getNow() { + // Use UTC date to ensure reader date is same on all timezones. + return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC); + } + + public Configuration getConfig() { + return conf; + } +}
http://git-wip-us.apache.org/repos/asf/hive/blob/e08cc6e6/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java new file mode 100644 index 0000000..1ae8194 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java @@ -0,0 +1,493 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.hooks; + +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERDATABASE; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERDATABASE_OWNER; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERPARTITION_BUCKETNUM; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERPARTITION_FILEFORMAT; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERPARTITION_LOCATION; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERPARTITION_MERGEFILES; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERPARTITION_SERDEPROPERTIES; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERPARTITION_SERIALIZER; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_ADDCOLS; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_ADDCONSTRAINT; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_ADDPARTS; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_ARCHIVE; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_BUCKETNUM; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_CLUSTER_SORT; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_COMPACT; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_DROPCONSTRAINT; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_DROPPARTS; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_EXCHANGEPARTITION; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_FILEFORMAT; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_LOCATION; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_MERGEFILES; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_PARTCOLTYPE; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_PROPERTIES; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_RENAME; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_RENAMECOL; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_RENAMEPART; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_REPLACECOLS; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_SERDEPROPERTIES; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_SERIALIZER; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_SKEWED; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_TOUCH; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_UNARCHIVE; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_UPDATEPARTSTATS; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTABLE_UPDATETABLESTATS; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERTBLPART_SKEWED_LOCATION; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERVIEW_AS; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERVIEW_PROPERTIES; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ALTERVIEW_RENAME; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.ANALYZE_TABLE; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.CACHE_METADATA; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.CREATEDATABASE; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.CREATEFUNCTION; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.CREATEMACRO; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.CREATEROLE; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.CREATETABLE; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.CREATETABLE_AS_SELECT; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.CREATEVIEW; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.DROPDATABASE; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.DROPFUNCTION; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.DROPMACRO; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.DROPROLE; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.DROPTABLE; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.DROPVIEW; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.DROPVIEW_PROPERTIES; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.EXPORT; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.IMPORT; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.KILL_QUERY; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.LOAD; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.LOCKTABLE; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.MSCK; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.QUERY; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.RELOADFUNCTION; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.TRUNCATETABLE; +import static org.apache.hadoop.hive.ql.plan.HiveOperation.UNLOCKTABLE; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.ExplainTask; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; +import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.HiveHookEventProto; +import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry; +import org.apache.hadoop.hive.ql.parse.ExplainConfiguration; +import org.apache.hadoop.hive.ql.plan.ExplainWork; +import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hive.common.util.ShutdownHookManager; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Log events from hive hook using protobuf serialized format, partitioned by date. + */ +public class HiveProtoLoggingHook implements ExecuteWithHookContext { + private static final Logger LOG = LoggerFactory.getLogger(HiveProtoLoggingHook.class.getName()); + private static final Set<String> includedOperationSet; + private static final int VERSION = 1; + + static { + // List of operation for which we log. + includedOperationSet = Arrays.stream(new HiveOperation[] { LOAD, EXPORT, IMPORT, + CREATEDATABASE, DROPDATABASE, DROPTABLE, MSCK, ALTERTABLE_ADDCOLS, ALTERTABLE_REPLACECOLS, + ALTERTABLE_RENAMECOL, ALTERTABLE_RENAMEPART, ALTERTABLE_UPDATEPARTSTATS, + ALTERTABLE_UPDATETABLESTATS, ALTERTABLE_RENAME, ALTERTABLE_DROPPARTS, ALTERTABLE_ADDPARTS, + ALTERTABLE_TOUCH, ALTERTABLE_ARCHIVE, ALTERTABLE_UNARCHIVE, ALTERTABLE_PROPERTIES, + ALTERTABLE_SERIALIZER, ALTERPARTITION_SERIALIZER, ALTERTABLE_SERDEPROPERTIES, + ALTERPARTITION_SERDEPROPERTIES, ALTERTABLE_CLUSTER_SORT, ANALYZE_TABLE, CACHE_METADATA, + ALTERTABLE_BUCKETNUM, ALTERPARTITION_BUCKETNUM, CREATEFUNCTION, DROPFUNCTION, + RELOADFUNCTION, CREATEMACRO, DROPMACRO, CREATEVIEW, DROPVIEW, ALTERVIEW_PROPERTIES, + DROPVIEW_PROPERTIES, LOCKTABLE, UNLOCKTABLE, CREATEROLE, DROPROLE, ALTERTABLE_FILEFORMAT, + ALTERPARTITION_FILEFORMAT, ALTERTABLE_LOCATION, ALTERPARTITION_LOCATION, CREATETABLE, + TRUNCATETABLE, CREATETABLE_AS_SELECT, QUERY, ALTERDATABASE, ALTERDATABASE_OWNER, + ALTERTABLE_MERGEFILES, ALTERPARTITION_MERGEFILES, ALTERTABLE_SKEWED, + ALTERTBLPART_SKEWED_LOCATION, ALTERTABLE_PARTCOLTYPE, ALTERTABLE_EXCHANGEPARTITION, + ALTERTABLE_DROPCONSTRAINT, ALTERTABLE_ADDCONSTRAINT, ALTERVIEW_RENAME, ALTERVIEW_AS, + ALTERTABLE_COMPACT, KILL_QUERY }) + .map(HiveOperation::getOperationName) + .collect(Collectors.toSet()); + } + + public static final String HIVE_EVENTS_BASE_PATH = "hive.hook.proto.base-directory"; + public static final String HIVE_HOOK_PROTO_QUEUE_CAPACITY = "hive.hook.proto.queue.capacity"; + public static final int HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT = 64; + private static final int WAIT_TIME = 5; + + public enum EventType { + QUERY_SUBMITTED, QUERY_COMPLETED + } + + public enum OtherInfoType { + QUERY, STATUS, TEZ, MAPRED, INVOKER_INFO, SESSION_ID, THREAD_NAME, VERSION, CLIENT_IP_ADDRESS, + HIVE_ADDRESS, HIVE_INSTANCE_TYPE, CONF, PERF, LLAP_APP_ID + } + + public enum ExecutionMode { + MR, TEZ, LLAP, SPARK, NONE + } + + static class EventLogger { + private final Clock clock; + private final String logFileName; + private final DatePartitionedLogger<HiveHookEventProto> logger; + private final ExecutorService eventHandler; + private final ExecutorService logWriter; + + EventLogger(HiveConf conf, Clock clock) { + this.clock = clock; + // randomUUID is slow, since its cryptographically secure, only first query will take time. + this.logFileName = "hive_" + UUID.randomUUID().toString(); + String baseDir = conf.get(HIVE_EVENTS_BASE_PATH); + if (baseDir == null) { + LOG.error(HIVE_EVENTS_BASE_PATH + " is not set, logging disabled."); + } + + DatePartitionedLogger<HiveHookEventProto> tmpLogger = null; + try { + if (baseDir != null) { + tmpLogger = new DatePartitionedLogger<>(HiveHookEventProto.PARSER, new Path(baseDir), + conf, clock); + } + } catch (IOException e) { + LOG.error("Unable to intialize logger, logging disabled.", e); + } + this.logger = tmpLogger; + if (logger == null) { + eventHandler = null; + logWriter = null; + return; + } + + int queueCapacity = conf.getInt(HIVE_HOOK_PROTO_QUEUE_CAPACITY, + HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT); + + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Hive Hook Proto Event Handler %d").build(); + eventHandler = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory); + + threadFactory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Hive Hook Proto Log Writer %d").build(); + logWriter = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(queueCapacity), threadFactory); + } + + void shutdown() { + // Wait for all the events to be written off, the order of service is important + for (ExecutorService service : new ExecutorService[] {eventHandler, logWriter}) { + if (service == null) { + continue; + } + service.shutdown(); + try { + service.awaitTermination(WAIT_TIME, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("Got interrupted exception while waiting for events to be flushed", e); + } + } + } + + void handle(HookContext hookContext) { + if (logger == null) { + return; + } + try { + eventHandler.execute(() -> generateEvent(hookContext)); + } catch (RejectedExecutionException e) { + LOG.warn("Handler queue full ignoring event: " + hookContext.getHookType()); + } + } + + private void generateEvent(HookContext hookContext) { + QueryPlan plan = hookContext.getQueryPlan(); + if (plan == null) { + LOG.debug("Received null query plan."); + return; + } + if (!includedOperationSet.contains(plan.getOperationName())) { + LOG.debug("Not logging events of operation type : {}", plan.getOperationName()); + return; + } + HiveHookEventProto event; + switch (hookContext.getHookType()) { + case PRE_EXEC_HOOK: + event = getPreHookEvent(hookContext); + break; + case POST_EXEC_HOOK: + event = getPostHookEvent(hookContext, true); + break; + case ON_FAILURE_HOOK: + event = getPostHookEvent(hookContext, false); + break; + default: + LOG.warn("Ignoring event of type: {}", hookContext.getHookType()); + event = null; + } + if (event != null) { + try { + logWriter.execute(() -> writeEvent(event)); + } catch (RejectedExecutionException e) { + LOG.warn("Writer queue full ignoring event {} for query {}", + hookContext.getHookType(), plan.getQueryId()); + } + } + } + + private void writeEvent(HiveHookEventProto event) { + try (ProtoMessageWriter<HiveHookEventProto> writer = logger.getWriter(logFileName)) { + writer.writeProto(event); + // This does not work hence, opening and closing file for every event. + // writer.hflush(); + } catch (IOException e) { + LOG.error("Error writing proto message for query {}, eventType: {}: ", + event.getHiveQueryId(), event.getEventType(), e); + } + } + + private HiveHookEventProto getPreHookEvent(HookContext hookContext) { + QueryPlan plan = hookContext.getQueryPlan(); + LOG.info("Received pre-hook notification for: " + plan.getQueryId()); + + // Make a copy so that we do not modify hookContext conf. + HiveConf conf = new HiveConf(hookContext.getConf()); + List<ExecDriver> mrTasks = Utilities.getMRTasks(plan.getRootTasks()); + List<TezTask> tezTasks = Utilities.getTezTasks(plan.getRootTasks()); + ExecutionMode executionMode = getExecutionMode(plan, mrTasks, tezTasks); + + HiveHookEventProto.Builder builder = HiveHookEventProto.newBuilder(); + builder.setEventType(EventType.QUERY_SUBMITTED.name()); + builder.setTimestamp(plan.getQueryStartTime()); + builder.setHiveQueryId(plan.getQueryId()); + builder.setUser(getUser(hookContext)); + builder.setRequestUser(getRequestUser(hookContext)); + builder.setQueue(conf.get("mapreduce.job.queuename")); + builder.setExecutionMode(executionMode.name()); + builder.addAllTablesRead(getTablesFromEntitySet(hookContext.getInputs())); + builder.addAllTablesWritten(getTablesFromEntitySet(hookContext.getOutputs())); + if (hookContext.getOperationId() != null) { + builder.setOperationId(hookContext.getOperationId()); + } + + try { + JSONObject queryObj = new JSONObject(); + queryObj.put("queryText", plan.getQueryStr()); + queryObj.put("queryPlan", getExplainPlan(plan, conf, hookContext)); + addMapEntry(builder, OtherInfoType.QUERY, queryObj.toString()); + } catch (Exception e) { + LOG.error("Unexpected exception while serializing json.", e); + } + + addMapEntry(builder, OtherInfoType.TEZ, Boolean.toString(tezTasks.size() > 0)); + addMapEntry(builder, OtherInfoType.MAPRED, Boolean.toString(mrTasks.size() > 0)); + addMapEntry(builder, OtherInfoType.SESSION_ID, hookContext.getSessionId()); + String logID = conf.getLogIdVar(hookContext.getSessionId()); + addMapEntry(builder, OtherInfoType.INVOKER_INFO, logID); + addMapEntry(builder, OtherInfoType.THREAD_NAME, hookContext.getThreadId()); + addMapEntry(builder, OtherInfoType.VERSION, Integer.toString(VERSION)); + addMapEntry(builder, OtherInfoType.CLIENT_IP_ADDRESS, hookContext.getIpAddress()); + + String hiveInstanceAddress = hookContext.getHiveInstanceAddress(); + if (hiveInstanceAddress == null) { + try { + hiveInstanceAddress = InetAddress.getLocalHost().getHostAddress(); + } catch (UnknownHostException e) { + LOG.error("Error tyring to get localhost address: ", e); + } + } + addMapEntry(builder, OtherInfoType.HIVE_ADDRESS, hiveInstanceAddress); + + String hiveInstanceType = hookContext.isHiveServerQuery() ? "HS2" : "CLI"; + addMapEntry(builder, OtherInfoType.HIVE_INSTANCE_TYPE, hiveInstanceType); + + ApplicationId llapId = determineLlapId(conf, executionMode); + if (llapId != null) { + addMapEntry(builder, OtherInfoType.LLAP_APP_ID, llapId.toString()); + } + + conf.stripHiddenConfigurations(conf); + JSONObject confObj = new JSONObject(); + for (Map.Entry<String, String> setting : conf) { + confObj.put(setting.getKey(), setting.getValue()); + } + addMapEntry(builder, OtherInfoType.CONF, confObj.toString()); + return builder.build(); + } + + private HiveHookEventProto getPostHookEvent(HookContext hookContext, boolean success) { + QueryPlan plan = hookContext.getQueryPlan(); + LOG.info("Received post-hook notification for: " + plan.getQueryId()); + + HiveHookEventProto.Builder builder = HiveHookEventProto.newBuilder(); + builder.setEventType(EventType.QUERY_COMPLETED.name()); + builder.setTimestamp(clock.getTime()); + builder.setHiveQueryId(plan.getQueryId()); + builder.setUser(getUser(hookContext)); + builder.setRequestUser(getRequestUser(hookContext)); + if (hookContext.getOperationId() != null) { + builder.setOperationId(hookContext.getOperationId()); + } + addMapEntry(builder, OtherInfoType.STATUS, Boolean.toString(success)); + JSONObject perfObj = new JSONObject(hookContext.getPerfLogger().getEndTimes()); + addMapEntry(builder, OtherInfoType.PERF, perfObj.toString()); + + return builder.build(); + } + + private void addMapEntry(HiveHookEventProto.Builder builder, OtherInfoType key, String value) { + if (value != null) { + builder.addOtherInfo( + MapFieldEntry.newBuilder().setKey(key.name()).setValue(value).build()); + } + } + + private String getUser(HookContext hookContext) { + return hookContext.getUgi().getShortUserName(); + } + + private String getRequestUser(HookContext hookContext) { + String requestuser = hookContext.getUserName(); + if (requestuser == null) { + requestuser = hookContext.getUgi().getUserName(); + } + return requestuser; + } + + private List<String> getTablesFromEntitySet(Set<? extends Entity> entities) { + List<String> tableNames = new ArrayList<>(); + for (Entity entity : entities) { + if (entity.getType() == Entity.Type.TABLE) { + tableNames.add(entity.getTable().getDbName() + "." + entity.getTable().getTableName()); + } + } + return tableNames; + } + + private ExecutionMode getExecutionMode(QueryPlan plan, List<ExecDriver> mrTasks, + List<TezTask> tezTasks) { + if (tezTasks.size() > 0) { + // Need to go in and check if any of the tasks is running in LLAP mode. + for (TezTask tezTask : tezTasks) { + if (tezTask.getWork().getLlapMode()) { + return ExecutionMode.LLAP; + } + } + return ExecutionMode.TEZ; + } else if (mrTasks.size() > 0) { + return ExecutionMode.MR; + } else if (Utilities.getSparkTasks(plan.getRootTasks()).size() > 0) { + return ExecutionMode.SPARK; + } else { + return ExecutionMode.NONE; + } + } + + private JSONObject getExplainPlan(QueryPlan plan, HiveConf conf, HookContext hookContext) + throws Exception { + // Get explain plan for the query. + ExplainConfiguration config = new ExplainConfiguration(); + config.setFormatted(true); + ExplainWork work = new ExplainWork(null, // resFile + null, // pCtx + plan.getRootTasks(), // RootTasks + plan.getFetchTask(), // FetchTask + null, // analyzer + config, // explainConfig + null // cboInfo + ); + ExplainTask explain = (ExplainTask) TaskFactory.get(work, conf); + explain.initialize(hookContext.getQueryState(), plan, null, null); + return explain.getJSONPlan(null, work); + } + + private ApplicationId determineLlapId(HiveConf conf, ExecutionMode mode) { + // Note: for now, LLAP is only supported in Tez tasks. Will never come to MR; others may + // be added here, although this is only necessary to have extra debug information. + if (mode == ExecutionMode.LLAP) { + // In HS2, the client should have been cached already for the common case. + // Otherwise, this may actually introduce delay to compilation for the first query. + String hosts = HiveConf.getVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS); + if (hosts != null && !hosts.isEmpty()) { + try { + return LlapRegistryService.getClient(conf).getApplicationId(); + } catch (IOException e) { + LOG.error("Error trying to get llap instance", e); + } + } else { + LOG.info("Cannot determine LLAP instance on client - service hosts are not set"); + return null; + } + } + return null; + } + + // Singleton using DCL. + private static volatile EventLogger instance; + static EventLogger getInstance(HiveConf conf) { + if (instance == null) { + synchronized (EventLogger.class) { + if (instance == null) { + instance = new EventLogger(conf, SystemClock.getInstance()); + ShutdownHookManager.addShutdownHook(instance::shutdown); + } + } + } + return instance; + } + } + + @Override + public void run(HookContext hookContext) throws Exception { + try { + EventLogger logger = EventLogger.getInstance(hookContext.getConf()); + logger.handle(hookContext); + } catch (Exception e) { + LOG.error("Got exceptoin while processing event: ", e); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/e08cc6e6/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java new file mode 100644 index 0000000..1c4296c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.hooks; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; + +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; + +public class ProtoMessageReader<T extends MessageLite> implements Closeable { + private final Path filePath; + private final SequenceFile.Reader reader; + private final ProtoMessageWritable<T> writable; + + ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException { + this.filePath = filePath; + this.reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filePath)); + this.writable = new ProtoMessageWritable<>(parser); + } + + public Path getFilePath() { + return filePath; + } + + public void setOffset(long offset) throws IOException { + reader.seek(offset); + } + + public long getOffset() throws IOException { + return reader.getPosition(); + } + + public T readEvent() throws IOException { + if (!reader.next(NullWritable.get(), writable)) { + return null; + } + return writable.getMessage(); + } + + @Override + public void close() throws IOException { + reader.close(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/e08cc6e6/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java new file mode 100644 index 0000000..61d8449 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.hooks; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.io.Writable; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; + +public class ProtoMessageWritable<T extends MessageLite> implements Writable { + private T message; + private final Parser<T> parser; + private DataOutputStream dos; + private CodedOutputStream cos; + private DataInputStream din; + private CodedInputStream cin; + + ProtoMessageWritable(Parser<T> parser) { + this.parser = parser; + } + + public T getMessage() { + return message; + } + + public void setMessage(T message) { + this.message = message; + } + + private static class DataOutputStream extends OutputStream { + DataOutput out; + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void write(byte b[], int off, int len) throws IOException { + out.write(b, off, len); + } + } + + @Override + public void write(DataOutput out) throws IOException { + if (dos == null) { + dos = new DataOutputStream(); + cos = CodedOutputStream.newInstance(dos); + } + dos.out = out; + cos.writeMessageNoTag(message); + cos.flush(); + } + + private static class DataInputStream extends InputStream { + DataInput in; + @Override + public int read() throws IOException { + try { + return in.readUnsignedByte(); + } catch (EOFException e) { + return -1; + } + } + } + + @Override + public void readFields(DataInput in) throws IOException { + if (din == null) { + din = new DataInputStream(); + cin = CodedInputStream.newInstance(din); + cin.setSizeLimit(Integer.MAX_VALUE); + } + din.in = in; + message = cin.readMessage(parser, null); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/e08cc6e6/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java new file mode 100644 index 0000000..ed8de93 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.hooks; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; + +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; + +public class ProtoMessageWriter<T extends MessageLite> implements Closeable { + private final Path filePath; + private final SequenceFile.Writer writer; + private final ProtoMessageWritable<T> writable; + + ProtoMessageWriter(Configuration conf, Path filePath, Parser<T> parser) throws IOException { + this.filePath = filePath; + this.writer = SequenceFile.createWriter( + conf, + SequenceFile.Writer.file(filePath), + SequenceFile.Writer.keyClass(NullWritable.class), + SequenceFile.Writer.valueClass(ProtoMessageWritable.class), + SequenceFile.Writer.appendIfExists(true), + SequenceFile.Writer.compression(CompressionType.RECORD)); + this.writable = new ProtoMessageWritable<>(parser); + } + + public Path getPath() { + return filePath; + } + + public long getOffset() throws IOException { + return writer.getLength(); + } + + public void writeProto(T message) throws IOException { + writable.setMessage(message); + writer.append(NullWritable.get(), writable); + } + + public void hflush() throws IOException { + writer.hflush(); + } + + @Override + public void close() throws IOException { + writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/e08cc6e6/ql/src/protobuf/HiveEvents.proto ---------------------------------------------------------------------- diff --git a/ql/src/protobuf/HiveEvents.proto b/ql/src/protobuf/HiveEvents.proto new file mode 100644 index 0000000..eab0cc9 --- /dev/null +++ b/ql/src/protobuf/HiveEvents.proto @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +option java_package = "org.apache.hadoop.hive.ql.hooks.proto"; +option java_outer_classname = "HiveHookEvents"; + +message MapFieldEntry { + optional string key = 1; + optional string value = 2; +} + +message HiveHookEventProto { + optional string eventType = 1; + optional string hiveQueryId = 2; + optional int64 timestamp = 3; + optional string executionMode = 4; + optional string requestUser = 5; + optional string queue = 6; + optional string user = 7; + optional string operationId = 8; + repeated string tablesWritten = 9; + repeated string tablesRead = 10; + repeated MapFieldEntry otherInfo = 50; +} http://git-wip-us.apache.org/repos/asf/hive/blob/e08cc6e6/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java new file mode 100644 index 0000000..5e117fe --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.hooks; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.EventLogger; +import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.EventType; +import org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook.OtherInfoType; +import org.apache.hadoop.hive.ql.hooks.HookContext.HookType; +import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.HiveHookEventProto; +import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.yarn.util.SystemClock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + + +public class TestHiveProtoLoggingHook { + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private HiveConf conf; + private HookContext context; + private String tmpFolder; + + @Before + public void setup() throws Exception { + conf = new HiveConf(); + tmpFolder = folder.newFolder().getAbsolutePath(); + conf.set(HiveProtoLoggingHook.HIVE_EVENTS_BASE_PATH, tmpFolder); + QueryState state = new QueryState.Builder().withHiveConf(conf).build(); + @SuppressWarnings("serial") + QueryPlan queryPlan = new QueryPlan(HiveOperation.QUERY) {}; + queryPlan.setQueryId("test_queryId"); + queryPlan.setQueryStartTime(1234L); + queryPlan.setRootTasks(new ArrayList<>()); + queryPlan.setInputs(new HashSet<>()); + queryPlan.setOutputs(new HashSet<>()); + + PerfLogger perf = PerfLogger.getPerfLogger(conf, true); + context = new HookContext(queryPlan, state, null, "test_user", "192.168.10.10", + "hive_addr", "test_op_id", "test_session_id", "test_thread_id", true, perf, null); + } + + @Test + public void testPreEventLog() throws Exception { + context.setHookType(HookType.PRE_EXEC_HOOK); + EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance()); + evtLogger.handle(context); + evtLogger.shutdown(); + + HiveHookEventProto event = loadEvent(conf, tmpFolder); + + Assert.assertEquals(EventType.QUERY_SUBMITTED.name(), event.getEventType()); + Assert.assertEquals(1234L, event.getTimestamp()); + Assert.assertEquals(System.getProperty("user.name"), event.getUser()); + Assert.assertEquals("test_user", event.getRequestUser()); + Assert.assertEquals("test_queryId", event.getHiveQueryId()); + Assert.assertEquals("test_op_id", event.getOperationId()); + Assert.assertEquals("NONE", event.getExecutionMode()); + + assertOtherInfo(event, OtherInfoType.TEZ, Boolean.FALSE.toString()); + assertOtherInfo(event, OtherInfoType.MAPRED, Boolean.FALSE.toString()); + assertOtherInfo(event, OtherInfoType.CLIENT_IP_ADDRESS, "192.168.10.10"); + assertOtherInfo(event, OtherInfoType.SESSION_ID, "test_session_id"); + assertOtherInfo(event, OtherInfoType.THREAD_NAME, "test_thread_id"); + assertOtherInfo(event, OtherInfoType.HIVE_INSTANCE_TYPE, "HS2"); + assertOtherInfo(event, OtherInfoType.HIVE_ADDRESS, "hive_addr"); + assertOtherInfo(event, OtherInfoType.CONF, null); + assertOtherInfo(event, OtherInfoType.QUERY, null); + } + + @Test + public void testPostEventLog() throws Exception { + context.setHookType(HookType.POST_EXEC_HOOK); + + EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance()); + evtLogger.handle(context); + evtLogger.shutdown(); + + HiveHookEventProto event = loadEvent(conf, tmpFolder); + Assert.assertEquals(EventType.QUERY_COMPLETED.name(), event.getEventType()); + Assert.assertEquals(System.getProperty("user.name"), event.getUser()); + Assert.assertEquals("test_user", event.getRequestUser()); + Assert.assertEquals("test_queryId", event.getHiveQueryId()); + Assert.assertEquals("test_op_id", event.getOperationId()); + + assertOtherInfo(event, OtherInfoType.STATUS, Boolean.TRUE.toString()); + assertOtherInfo(event, OtherInfoType.PERF, null); + } + + @Test + public void testFailureEventLog() throws Exception { + context.setHookType(HookType.ON_FAILURE_HOOK); + + EventLogger evtLogger = new EventLogger(conf, SystemClock.getInstance()); + evtLogger.handle(context); + evtLogger.shutdown(); + + HiveHookEventProto event = loadEvent(conf, tmpFolder); + Assert.assertEquals(EventType.QUERY_COMPLETED.name(), event.getEventType()); + Assert.assertEquals(System.getProperty("user.name"), event.getUser()); + Assert.assertEquals("test_user", event.getRequestUser()); + Assert.assertEquals("test_queryId", event.getHiveQueryId()); + Assert.assertEquals("test_op_id", event.getOperationId()); + + assertOtherInfo(event, OtherInfoType.STATUS, Boolean.FALSE.toString()); + assertOtherInfo(event, OtherInfoType.PERF, null); + } + + private HiveHookEventProto loadEvent(HiveConf conf, String tmpFolder) + throws IOException, FileNotFoundException { + Path path = new Path(tmpFolder); + FileSystem fs = path.getFileSystem(conf); + FileStatus[] status = fs.listStatus(path); + Assert.assertEquals(1, status.length); + status = fs.listStatus(status[0].getPath()); + Assert.assertEquals(1, status.length); + + DatePartitionedLogger<HiveHookEventProto> logger = new DatePartitionedLogger<>( + HiveHookEventProto.PARSER, path, conf, SystemClock.getInstance()); + ProtoMessageReader<HiveHookEventProto> reader = logger.getReader(status[0].getPath()); + HiveHookEventProto event = reader.readEvent(); + Assert.assertNotNull(event); + return event; + } + + private void assertOtherInfo(HiveHookEventProto event, OtherInfoType key, String value) { + for (MapFieldEntry otherInfo : event.getOtherInfoList()) { + if (otherInfo.getKey().equals(key.name())) { + if (value != null) { + Assert.assertEquals(value, otherInfo.getValue()); + } + return; + } + } + Assert.fail("Cannot find key: " + key); + } +}