Repository: hive Updated Branches: refs/heads/branch-3 abfbd1af3 -> 55bc28540
HIVE-19646: Filesystem closed error in HiveProtoLoggingHook (Harish JP, reviewd by Anishek Agarwal) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/55bc2854 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/55bc2854 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/55bc2854 Branch: refs/heads/branch-3 Commit: 55bc2854096f7666244a23e1d769a90fcda0863d Parents: abfbd1a Author: Anishek Agarwal <anis...@gmail.com> Authored: Thu Jun 14 16:40:48 2018 +0530 Committer: Anishek Agarwal <anis...@gmail.com> Committed: Thu Jun 14 16:40:48 2018 +0530 ---------------------------------------------------------------------- .../hive/ql/hooks/DatePartitionedLogger.java | 167 ----------------- .../hive/ql/hooks/HiveProtoLoggingHook.java | 32 +++- .../hive/ql/hooks/ProtoMessageReader.java | 66 ------- .../hive/ql/hooks/ProtoMessageWritable.java | 101 ----------- .../hive/ql/hooks/ProtoMessageWriter.java | 71 -------- .../logging/proto/DatePartitionedLogger.java | 177 +++++++++++++++++++ .../logging/proto/ProtoMessageReader.java | 66 +++++++ .../logging/proto/ProtoMessageWritable.java | 101 +++++++++++ .../logging/proto/ProtoMessageWriter.java | 71 ++++++++ .../dag/history/logging/proto/package-info.java | 23 +++ .../hive/ql/hooks/TestHiveProtoLoggingHook.java | 2 + 11 files changed, 465 insertions(+), 412 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java deleted file mode 100644 index c9d1b93..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/DatePartitionedLogger.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.hooks; - -import java.io.IOException; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.yarn.util.Clock; - -import com.google.protobuf.MessageLite; -import com.google.protobuf.Parser; - -/** - * Class to create proto reader and writer for a date partitioned directory structure. - * - * @param <T> The proto message type. - */ -public class DatePartitionedLogger<T extends MessageLite> { - // Everyone has permission to write, but with sticky set so that delete is restricted. - // This is required, since the path is same for all users and everyone writes into it. - private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777); - - private final Parser<T> parser; - private final Path basePath; - private final Configuration conf; - private final Clock clock; - private final FileSystem fileSystem; - - public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration conf, Clock clock) - throws IOException { - this.conf = conf; - this.clock = clock; - this.parser = parser; - this.fileSystem = baseDir.getFileSystem(conf); - if (!fileSystem.exists(baseDir)) { - fileSystem.mkdirs(baseDir); - fileSystem.setPermission(baseDir, DIR_PERMISSION); - } - this.basePath = fileSystem.resolvePath(baseDir); - } - - /** - * Creates a writer for the given fileName, with date as today. - */ - public ProtoMessageWriter<T> getWriter(String fileName) throws IOException { - Path filePath = getPathForDate(getNow().toLocalDate(), fileName); - return new ProtoMessageWriter<>(conf, filePath, parser); - } - - /** - * Creates a reader for the given filePath, no validation is done. - */ - public ProtoMessageReader<T> getReader(Path filePath) throws IOException { - return new ProtoMessageReader<>(conf, filePath, parser); - } - - /** - * Create a path for the given date and fileName. This can be used to create a reader. - */ - public Path getPathForDate(LocalDate date, String fileName) throws IOException { - Path path = new Path(basePath, getDirForDate(date)); - if (!fileSystem.exists(path)) { - fileSystem.mkdirs(path); - fileSystem.setPermission(path, DIR_PERMISSION); - } - return new Path(path, fileName); - } - - /** - * Extract the date from the directory name, this should be a directory created by this class. - */ - public LocalDate getDateFromDir(String dirName) { - if (!dirName.startsWith("date=")) { - throw new IllegalArgumentException("Invalid directory: "+ dirName); - } - return LocalDate.parse(dirName.substring(5), DateTimeFormatter.ISO_LOCAL_DATE); - } - - /** - * Returns the directory name for a given date. - */ - public String getDirForDate(LocalDate date) { - return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date); - } - - /** - * Find next available directory, after the given directory. - */ - public String getNextDirectory(String currentDir) throws IOException { - // Fast check, if the next day directory exists return it. - String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1)); - if (fileSystem.exists(new Path(basePath, nextDate))) { - return nextDate; - } - // Have to scan the directory to find min date greater than currentDir. - String dirName = null; - for (FileStatus status : fileSystem.listStatus(basePath)) { - String name = status.getPath().getName(); - // String comparison is good enough, since its of form date=yyyy-MM-dd - if (name.compareTo(currentDir) > 0 && (dirName == null || name.compareTo(dirName) < 0)) { - dirName = name; - } - } - return dirName; - } - - /** - * Returns new or changed files in the given directory. The offsets are used to find - * changed files. - */ - public List<Path> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets) - throws IOException { - Path dirPath = new Path(basePath, subDir); - List<Path> newFiles = new ArrayList<>(); - if (!fileSystem.exists(dirPath)) { - return newFiles; - } - for (FileStatus status : fileSystem.listStatus(dirPath)) { - String fileName = status.getPath().getName(); - Long offset = currentOffsets.get(fileName); - // If the offset was never added or offset < fileSize. - if (offset == null || offset < status.getLen()) { - newFiles.add(new Path(dirPath, fileName)); - } - } - return newFiles; - } - - /** - * Returns the current time, using the underlying clock in UTC time. - */ - public LocalDateTime getNow() { - // Use UTC date to ensure reader date is same on all timezones. - return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC); - } - - public Configuration getConfig() { - return conf; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java index 1ae8194..eef6ac9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java @@ -118,6 +118,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hive.common.util.ShutdownHookManager; +import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger; +import org.apache.tez.dag.history.logging.proto.ProtoMessageWriter; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -279,14 +281,30 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { } } + private static final int MAX_RETRIES = 2; private void writeEvent(HiveHookEventProto event) { - try (ProtoMessageWriter<HiveHookEventProto> writer = logger.getWriter(logFileName)) { - writer.writeProto(event); - // This does not work hence, opening and closing file for every event. - // writer.hflush(); - } catch (IOException e) { - LOG.error("Error writing proto message for query {}, eventType: {}: ", - event.getHiveQueryId(), event.getEventType(), e); + for (int retryCount = 0; retryCount <= MAX_RETRIES; ++retryCount) { + try (ProtoMessageWriter<HiveHookEventProto> writer = logger.getWriter(logFileName)) { + writer.writeProto(event); + // This does not work hence, opening and closing file for every event. + // writer.hflush(); + return; + } catch (IOException e) { + if (retryCount < MAX_RETRIES) { + LOG.warn("Error writing proto message for query {}, eventType: {}, retryCount: {}," + + " error: {} ", event.getHiveQueryId(), event.getEventType(), retryCount, + e.getMessage()); + } else { + LOG.error("Error writing proto message for query {}, eventType: {}: ", + event.getHiveQueryId(), event.getEventType(), e); + } + try { + // 0 seconds, for first retry assuming fs object was closed and open will fix it. + Thread.sleep(1000 * retryCount * retryCount); + } catch (InterruptedException e1) { + LOG.warn("Got interrupted in retry sleep.", e1); + } + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java deleted file mode 100644 index 1c4296c..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageReader.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.hooks; - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; - -import com.google.protobuf.MessageLite; -import com.google.protobuf.Parser; - -public class ProtoMessageReader<T extends MessageLite> implements Closeable { - private final Path filePath; - private final SequenceFile.Reader reader; - private final ProtoMessageWritable<T> writable; - - ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException { - this.filePath = filePath; - this.reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filePath)); - this.writable = new ProtoMessageWritable<>(parser); - } - - public Path getFilePath() { - return filePath; - } - - public void setOffset(long offset) throws IOException { - reader.seek(offset); - } - - public long getOffset() throws IOException { - return reader.getPosition(); - } - - public T readEvent() throws IOException { - if (!reader.next(NullWritable.get(), writable)) { - return null; - } - return writable.getMessage(); - } - - @Override - public void close() throws IOException { - reader.close(); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java deleted file mode 100644 index 61d8449..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWritable.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.hooks; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import org.apache.hadoop.io.Writable; - -import com.google.protobuf.CodedInputStream; -import com.google.protobuf.CodedOutputStream; -import com.google.protobuf.MessageLite; -import com.google.protobuf.Parser; - -public class ProtoMessageWritable<T extends MessageLite> implements Writable { - private T message; - private final Parser<T> parser; - private DataOutputStream dos; - private CodedOutputStream cos; - private DataInputStream din; - private CodedInputStream cin; - - ProtoMessageWritable(Parser<T> parser) { - this.parser = parser; - } - - public T getMessage() { - return message; - } - - public void setMessage(T message) { - this.message = message; - } - - private static class DataOutputStream extends OutputStream { - DataOutput out; - @Override - public void write(int b) throws IOException { - out.write(b); - } - - @Override - public void write(byte b[], int off, int len) throws IOException { - out.write(b, off, len); - } - } - - @Override - public void write(DataOutput out) throws IOException { - if (dos == null) { - dos = new DataOutputStream(); - cos = CodedOutputStream.newInstance(dos); - } - dos.out = out; - cos.writeMessageNoTag(message); - cos.flush(); - } - - private static class DataInputStream extends InputStream { - DataInput in; - @Override - public int read() throws IOException { - try { - return in.readUnsignedByte(); - } catch (EOFException e) { - return -1; - } - } - } - - @Override - public void readFields(DataInput in) throws IOException { - if (din == null) { - din = new DataInputStream(); - cin = CodedInputStream.newInstance(din); - cin.setSizeLimit(Integer.MAX_VALUE); - } - din.in = in; - message = cin.readMessage(parser, null); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java deleted file mode 100644 index ed8de93..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/ProtoMessageWriter.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.hooks; - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.CompressionType; - -import com.google.protobuf.MessageLite; -import com.google.protobuf.Parser; - -public class ProtoMessageWriter<T extends MessageLite> implements Closeable { - private final Path filePath; - private final SequenceFile.Writer writer; - private final ProtoMessageWritable<T> writable; - - ProtoMessageWriter(Configuration conf, Path filePath, Parser<T> parser) throws IOException { - this.filePath = filePath; - this.writer = SequenceFile.createWriter( - conf, - SequenceFile.Writer.file(filePath), - SequenceFile.Writer.keyClass(NullWritable.class), - SequenceFile.Writer.valueClass(ProtoMessageWritable.class), - SequenceFile.Writer.appendIfExists(true), - SequenceFile.Writer.compression(CompressionType.RECORD)); - this.writable = new ProtoMessageWritable<>(parser); - } - - public Path getPath() { - return filePath; - } - - public long getOffset() throws IOException { - return writer.getLength(); - } - - public void writeProto(T message) throws IOException { - writable.setMessage(message); - writer.append(NullWritable.get(), writable); - } - - public void hflush() throws IOException { - writer.hflush(); - } - - @Override - public void close() throws IOException { - writer.close(); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java new file mode 100644 index 0000000..d6a5121 --- /dev/null +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.proto; + +import java.io.IOException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.yarn.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; + +/** + * Class to create proto reader and writer for a date partitioned directory structure. + * + * @param <T> The proto message type. + */ +public class DatePartitionedLogger<T extends MessageLite> { + private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class.getName()); + // Everyone has permission to write, but with sticky set so that delete is restricted. + // This is required, since the path is same for all users and everyone writes into it. + private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777); + + private final Parser<T> parser; + private final Path basePath; + private final Configuration conf; + private final Clock clock; + + public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration conf, Clock clock) + throws IOException { + this.conf = conf; + this.clock = clock; + this.parser = parser; + createDirIfNotExists(baseDir); + this.basePath = baseDir.getFileSystem(conf).resolvePath(baseDir); + } + + private void createDirIfNotExists(Path path) throws IOException { + FileSystem fileSystem = path.getFileSystem(conf); + try { + if (!fileSystem.exists(path)) { + fileSystem.mkdirs(path); + fileSystem.setPermission(path, DIR_PERMISSION); + } + } catch (IOException e) { + // Ignore this exception, if there is a problem it'll fail when trying to read or write. + LOG.warn("Error while trying to set permission: ", e); + } + } + + /** + * Creates a writer for the given fileName, with date as today. + */ + public ProtoMessageWriter<T> getWriter(String fileName) throws IOException { + Path filePath = getPathForDate(getNow().toLocalDate(), fileName); + return new ProtoMessageWriter<>(conf, filePath, parser); + } + + /** + * Creates a reader for the given filePath, no validation is done. + */ + public ProtoMessageReader<T> getReader(Path filePath) throws IOException { + return new ProtoMessageReader<>(conf, filePath, parser); + } + + /** + * Create a path for the given date and fileName. This can be used to create a reader. + */ + public Path getPathForDate(LocalDate date, String fileName) throws IOException { + Path path = new Path(basePath, getDirForDate(date)); + createDirIfNotExists(path); + return new Path(path, fileName); + } + + /** + * Extract the date from the directory name, this should be a directory created by this class. + */ + public LocalDate getDateFromDir(String dirName) { + if (!dirName.startsWith("date=")) { + throw new IllegalArgumentException("Invalid directory: "+ dirName); + } + return LocalDate.parse(dirName.substring(5), DateTimeFormatter.ISO_LOCAL_DATE); + } + + /** + * Returns the directory name for a given date. + */ + public String getDirForDate(LocalDate date) { + return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date); + } + + /** + * Find next available directory, after the given directory. + */ + public String getNextDirectory(String currentDir) throws IOException { + // Fast check, if the next day directory exists return it. + String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1)); + FileSystem fileSystem = basePath.getFileSystem(conf); + if (fileSystem.exists(new Path(basePath, nextDate))) { + return nextDate; + } + // Have to scan the directory to find min date greater than currentDir. + String dirName = null; + for (FileStatus status : fileSystem.listStatus(basePath)) { + String name = status.getPath().getName(); + // String comparison is good enough, since its of form date=yyyy-MM-dd + if (name.compareTo(currentDir) > 0 && (dirName == null || name.compareTo(dirName) < 0)) { + dirName = name; + } + } + return dirName; + } + + /** + * Returns new or changed files in the given directory. The offsets are used to find + * changed files. + */ + public List<Path> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets) + throws IOException { + Path dirPath = new Path(basePath, subDir); + FileSystem fileSystem = basePath.getFileSystem(conf); + List<Path> newFiles = new ArrayList<>(); + if (!fileSystem.exists(dirPath)) { + return newFiles; + } + for (FileStatus status : fileSystem.listStatus(dirPath)) { + String fileName = status.getPath().getName(); + Long offset = currentOffsets.get(fileName); + // If the offset was never added or offset < fileSize. + if (offset == null || offset < status.getLen()) { + newFiles.add(new Path(dirPath, fileName)); + } + } + return newFiles; + } + + /** + * Returns the current time, using the underlying clock in UTC time. + */ + public LocalDateTime getNow() { + // Use UTC date to ensure reader date is same on all timezones. + return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC); + } + + public Configuration getConfig() { + return conf; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java new file mode 100644 index 0000000..5a3c63a --- /dev/null +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.proto; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; + +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; + +public class ProtoMessageReader<T extends MessageLite> implements Closeable { + private final Path filePath; + private final SequenceFile.Reader reader; + private final ProtoMessageWritable<T> writable; + + ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException { + this.filePath = filePath; + this.reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filePath)); + this.writable = new ProtoMessageWritable<>(parser); + } + + public Path getFilePath() { + return filePath; + } + + public void setOffset(long offset) throws IOException { + reader.seek(offset); + } + + public long getOffset() throws IOException { + return reader.getPosition(); + } + + public T readEvent() throws IOException { + if (!reader.next(NullWritable.get(), writable)) { + return null; + } + return writable.getMessage(); + } + + @Override + public void close() throws IOException { + reader.close(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java new file mode 100644 index 0000000..7a08e20 --- /dev/null +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWritable.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.proto; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.io.Writable; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; + +public class ProtoMessageWritable<T extends MessageLite> implements Writable { + private T message; + private final Parser<T> parser; + private DataOutputStream dos; + private CodedOutputStream cos; + private DataInputStream din; + private CodedInputStream cin; + + ProtoMessageWritable(Parser<T> parser) { + this.parser = parser; + } + + public T getMessage() { + return message; + } + + public void setMessage(T message) { + this.message = message; + } + + private static class DataOutputStream extends OutputStream { + DataOutput out; + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void write(byte b[], int off, int len) throws IOException { + out.write(b, off, len); + } + } + + @Override + public void write(DataOutput out) throws IOException { + if (dos == null) { + dos = new DataOutputStream(); + cos = CodedOutputStream.newInstance(dos); + } + dos.out = out; + cos.writeMessageNoTag(message); + cos.flush(); + } + + private static class DataInputStream extends InputStream { + DataInput in; + @Override + public int read() throws IOException { + try { + return in.readUnsignedByte(); + } catch (EOFException e) { + return -1; + } + } + } + + @Override + public void readFields(DataInput in) throws IOException { + if (din == null) { + din = new DataInputStream(); + cin = CodedInputStream.newInstance(din); + cin.setSizeLimit(Integer.MAX_VALUE); + } + din.in = in; + message = cin.readMessage(parser, null); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java new file mode 100644 index 0000000..c746bb6 --- /dev/null +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.logging.proto; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; + +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; + +public class ProtoMessageWriter<T extends MessageLite> implements Closeable { + private final Path filePath; + private final SequenceFile.Writer writer; + private final ProtoMessageWritable<T> writable; + + ProtoMessageWriter(Configuration conf, Path filePath, Parser<T> parser) throws IOException { + this.filePath = filePath; + this.writer = SequenceFile.createWriter( + conf, + SequenceFile.Writer.file(filePath), + SequenceFile.Writer.keyClass(NullWritable.class), + SequenceFile.Writer.valueClass(ProtoMessageWritable.class), + SequenceFile.Writer.appendIfExists(true), + SequenceFile.Writer.compression(CompressionType.RECORD)); + this.writable = new ProtoMessageWritable<>(parser); + } + + public Path getPath() { + return filePath; + } + + public long getOffset() throws IOException { + return writer.getLength(); + } + + public void writeProto(T message) throws IOException { + writable.setMessage(message); + writer.append(NullWritable.get(), writable); + } + + public void hflush() throws IOException { + writer.hflush(); + } + + @Override + public void close() throws IOException { + writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java b/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java new file mode 100644 index 0000000..23ed460 --- /dev/null +++ b/ql/src/java/org/apache/tez/dag/history/logging/proto/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Logger code copied from tez codebase, this should be removed when we swtich + * to 0.9.2 tez version and we should depend on the tez libraries for this. + */ +package org.apache.tez.dag.history.logging.proto; http://git-wip-us.apache.org/repos/asf/hive/blob/55bc2854/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java index 5e117fe..98b73e8 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java @@ -38,6 +38,8 @@ import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents.MapFieldEntry; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.plan.HiveOperation; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tez.dag.history.logging.proto.DatePartitionedLogger; +import org.apache.tez.dag.history.logging.proto.ProtoMessageReader; import org.junit.Assert; import org.junit.Before; import org.junit.Rule;