AMBARI-19780. Hive View : Logs are not complete for hive view. (gauravn7)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0dfe8b6c Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0dfe8b6c Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0dfe8b6c Branch: refs/heads/branch-dev-patch-upgrade Commit: 0dfe8b6c9792aa2d40d8ecaa75f0256e9820eb89 Parents: 01f4a69 Author: Gaurav Nagar <[email protected]> Authored: Wed Feb 1 12:37:21 2017 +0530 Committer: Gaurav Nagar <[email protected]> Committed: Wed Feb 1 12:37:21 2017 +0530 ---------------------------------------------------------------------- .../ambari/view/hive2/actor/LogAggregator.java | 18 ++++++++++++------ .../view/hive2/actor/StatementExecutor.java | 9 ++++++--- .../hive2/actor/message/StartLogAggregation.java | 10 +++++++++- .../ambari/view/hive20/actor/LogAggregator.java | 18 ++++++++++++------ .../view/hive20/actor/StatementExecutor.java | 10 +++++++--- .../hive20/actor/message/StartLogAggregation.java | 10 +++++++++- 6 files changed, 55 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java index 9412f81..69b4a56 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java @@ -29,6 +29,7 @@ import org.apache.ambari.view.hive2.actor.message.StartLogAggregation; import org.apache.ambari.view.utils.hdfs.HdfsApi; import org.apache.ambari.view.utils.hdfs.HdfsApiException; import org.apache.ambari.view.utils.hdfs.HdfsUtil; +import org.apache.commons.lang.StringUtils; import org.apache.hive.jdbc.HiveStatement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,17 +48,17 @@ public class LogAggregator extends HiveActor { public static final int AGGREGATION_INTERVAL = 5 * 1000; private final HdfsApi hdfsApi; - private final HiveStatement statement; + private HiveStatement statement; private final String logFile; private Cancellable moreLogsScheduler; private ActorRef parent; private boolean hasStartedFetching = false; private boolean shouldFetchMore = true; + private String allLogs = ""; - public LogAggregator(HdfsApi hdfsApi, HiveStatement statement, String logFile) { + public LogAggregator(HdfsApi hdfsApi, String logFile) { this.hdfsApi = hdfsApi; - this.statement = statement; this.logFile = logFile; } @@ -65,7 +66,7 @@ public class LogAggregator extends HiveActor { public void handleMessage(HiveMessage hiveMessage) { Object message = hiveMessage.getMessage(); if (message instanceof StartLogAggregation) { - start(); + start((StartLogAggregation) message); } if (message instanceof GetMoreLogs) { @@ -79,10 +80,15 @@ public class LogAggregator extends HiveActor { } } - private void start() { + private void start(StartLogAggregation message) { + this.statement = message.getHiveStatement(); parent = this.getSender(); hasStartedFetching = false; shouldFetchMore = true; + String logTitle = "Logs for Query '" + message.getStatement() + "'"; + String repeatSeperator = StringUtils.repeat("=", logTitle.length()); + allLogs += String.format("\n\n%s\n%s\n%s\n", repeatSeperator, logTitle, repeatSeperator); + if (!(moreLogsScheduler == null || moreLogsScheduler.isCancelled())) { moreLogsScheduler.cancel(); } @@ -94,7 +100,7 @@ public class LogAggregator extends HiveActor { private void getMoreLogs() throws SQLException, HdfsApiException { List<String> logs = statement.getQueryLog(); if (logs.size() > 0 && shouldFetchMore) { - String allLogs = Joiner.on("\n").skipNulls().join(logs); + allLogs = allLogs + "\n" + Joiner.on("\n").skipNulls().join(logs); HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs); if(!statement.hasMoreLogs()) { shouldFetchMore = false; http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java index d7b4f54..6cdee81 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java @@ -96,7 +96,6 @@ public class StatementExecutor extends HiveActor { LOG.error("Failed to execute statement: {}. {}", message.getStatement(), e); sender().tell(new ResultInformation(message.getId(), new Failure("Failed to execute statement: " + message.getStatement(), e)), self()); } finally { - stopLogAggregation(); stopGUIDFetch(); } } @@ -120,11 +119,11 @@ public class StatementExecutor extends HiveActor { private void startLogAggregation(HiveStatement statement, String sqlStatement, String logFile) { if (logAggregator == null) { logAggregator = getContext().actorOf( - Props.create(LogAggregator.class, hdfsApi, statement, logFile) + Props.create(LogAggregator.class, hdfsApi, logFile) .withDispatcher("akka.actor.misc-dispatcher"), "LogAggregator:" + UUID.randomUUID().toString()); } LOG.info("Fetching query logs for statement: {}", sqlStatement); - logAggregator.tell(new StartLogAggregation(sqlStatement), getSelf()); + logAggregator.tell(new StartLogAggregation(sqlStatement, statement), getSelf()); } private void stopLogAggregation() { @@ -134,6 +133,10 @@ public class StatementExecutor extends HiveActor { logAggregator = null; } + @Override + public void postStop() throws Exception { + stopLogAggregation(); + } private void getColumnMetaData(GetColumnMetadataJob message) { try { http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java index b56da08..48fbced 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/StartLogAggregation.java @@ -18,14 +18,22 @@ package org.apache.ambari.view.hive2.actor.message; +import org.apache.hive.jdbc.HiveStatement; + public class StartLogAggregation { private String statement; + private HiveStatement hiveStatement; public StartLogAggregation() { } - public StartLogAggregation(String statement) { + public StartLogAggregation(String statement, HiveStatement hiveStatement) { this.statement = statement; + this.hiveStatement = hiveStatement; + } + + public HiveStatement getHiveStatement() { + return hiveStatement; } public String getStatement() { http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java index f9c21b4..600ea64 100644 --- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/LogAggregator.java @@ -28,6 +28,7 @@ import org.apache.ambari.view.hive20.actor.message.StartLogAggregation; import org.apache.ambari.view.utils.hdfs.HdfsApi; import org.apache.ambari.view.utils.hdfs.HdfsApiException; import org.apache.ambari.view.utils.hdfs.HdfsUtil; +import org.apache.commons.lang3.StringUtils; import org.apache.hive.jdbc.HiveStatement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,17 +47,17 @@ public class LogAggregator extends HiveActor { public static final int AGGREGATION_INTERVAL = 5 * 1000; private final HdfsApi hdfsApi; - private final HiveStatement statement; + private HiveStatement statement; private final String logFile; private Cancellable moreLogsScheduler; private ActorRef parent; private boolean hasStartedFetching = false; private boolean shouldFetchMore = true; + private String allLogs = ""; - public LogAggregator(HdfsApi hdfsApi, HiveStatement statement, String logFile) { + public LogAggregator(HdfsApi hdfsApi, String logFile) { this.hdfsApi = hdfsApi; - this.statement = statement; this.logFile = logFile; } @@ -64,7 +65,7 @@ public class LogAggregator extends HiveActor { public void handleMessage(HiveMessage hiveMessage) { Object message = hiveMessage.getMessage(); if (message instanceof StartLogAggregation) { - start(); + start((StartLogAggregation) message); } if (message instanceof GetMoreLogs) { @@ -79,10 +80,15 @@ public class LogAggregator extends HiveActor { } } - private void start() { + private void start(StartLogAggregation message) { + this.statement = message.getHiveStatement(); parent = this.getSender(); hasStartedFetching = false; shouldFetchMore = true; + String logTitle = "Logs for Query '" + message.getStatement() + "'"; + String repeatSeperator = StringUtils.repeat("=", logTitle.length()); + allLogs += String.format("\n\n%s\n%s\n%s\n", repeatSeperator, logTitle, repeatSeperator); + if (!(moreLogsScheduler == null || moreLogsScheduler.isCancelled())) { moreLogsScheduler.cancel(); } @@ -94,7 +100,7 @@ public class LogAggregator extends HiveActor { private void getMoreLogs() throws SQLException, HdfsApiException { List<String> logs = statement.getQueryLog(); if (logs.size() > 0 && shouldFetchMore) { - String allLogs = Joiner.on("\n").skipNulls().join(logs); + allLogs = allLogs + "\n" + Joiner.on("\n").skipNulls().join(logs); HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs); if(!statement.hasMoreLogs()) { shouldFetchMore = false; http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java index 03332d9..c3ed14b 100644 --- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/StatementExecutor.java @@ -96,7 +96,6 @@ public class StatementExecutor extends HiveActor { LOG.error("Failed to execute statement: {}. {}", message.getStatement(), e); sender().tell(new ResultInformation(message.getId(), new Failure("Failed to execute statement: " + message.getStatement(), e)), self()); } finally { - stopLogAggregation(); stopGUIDFetch(); } } @@ -120,11 +119,11 @@ public class StatementExecutor extends HiveActor { private void startLogAggregation(HiveStatement statement, String sqlStatement, String logFile) { if (logAggregator == null) { logAggregator = getContext().actorOf( - Props.create(LogAggregator.class, hdfsApi, statement, logFile) + Props.create(LogAggregator.class, hdfsApi, logFile) .withDispatcher("akka.actor.misc-dispatcher"), "LogAggregator:" + UUID.randomUUID().toString()); } LOG.info("Fetching query logs for statement: {}", sqlStatement); - logAggregator.tell(new StartLogAggregation(sqlStatement), getSelf()); + logAggregator.tell(new StartLogAggregation(sqlStatement, statement), getSelf()); } private void stopLogAggregation() { @@ -134,6 +133,11 @@ public class StatementExecutor extends HiveActor { logAggregator = null; } + @Override + public void postStop() throws Exception { + stopLogAggregation(); + } + private void getColumnMetaData(GetColumnMetadataJob message) { try { http://git-wip-us.apache.org/repos/asf/ambari/blob/0dfe8b6c/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/message/StartLogAggregation.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/message/StartLogAggregation.java b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/message/StartLogAggregation.java index 922ad1d..8aab04f 100644 --- a/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/message/StartLogAggregation.java +++ b/contrib/views/hive20/src/main/java/org/apache/ambari/view/hive20/actor/message/StartLogAggregation.java @@ -18,14 +18,22 @@ package org.apache.ambari.view.hive20.actor.message; +import org.apache.hive.jdbc.HiveStatement; + public class StartLogAggregation { private String statement; + private HiveStatement hiveStatement; public StartLogAggregation() { } - public StartLogAggregation(String statement) { + public StartLogAggregation(String statement, HiveStatement hiveStatement) { this.statement = statement; + this.hiveStatement = hiveStatement; + } + + public HiveStatement getHiveStatement() { + return hiveStatement; } public String getStatement() {
