AMBARI-17192. Enable cancel of currently long running job.(dipayanb)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/60057378 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/60057378 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/60057378 Branch: refs/heads/branch-2.4 Commit: 6005737844c73bd3cd661af34b4a58cc85acedd4 Parents: 22cc2d5 Author: Dipayan Bhowmick <[email protected]> Authored: Wed Jun 8 12:22:38 2016 +0530 Committer: Dipayan Bhowmick <[email protected]> Committed: Wed Jun 15 11:38:49 2016 +0530 ---------------------------------------------------------------------- .../ambari/view/hive2/ConnectionDelegate.java | 13 +- .../view/hive2/HiveJdbcConnectionDelegate.java | 90 ++--- .../view/hive2/actor/AsyncJdbcConnector.java | 193 ---------- .../view/hive2/actor/AsyncQueryExecutor.java | 92 ----- .../view/hive2/actor/GetResultHolder.java | 47 --- .../ambari/view/hive2/actor/JdbcConnector.java | 352 +++++++++++++++---- .../ambari/view/hive2/actor/LogAggregator.java | 31 +- .../view/hive2/actor/OperationController.java | 146 +++----- .../view/hive2/actor/ResultSetIterator.java | 75 +--- .../view/hive2/actor/StatementExecutor.java | 147 ++++++++ .../view/hive2/actor/SyncJdbcConnector.java | 174 --------- .../view/hive2/actor/YarnAtsGUIDFetcher.java | 69 ++++ .../ambari/view/hive2/actor/YarnAtsParser.java | 32 -- .../view/hive2/actor/message/AdvanceCursor.java | 32 -- .../hive2/actor/message/AssignResultSet.java | 48 --- .../hive2/actor/message/AssignStatement.java | 46 --- .../view/hive2/actor/message/AsyncJob.java | 52 --- .../view/hive2/actor/message/Connect.java | 22 +- .../ambari/view/hive2/actor/message/DDLJob.java | 73 ---- .../actor/message/GetColumnMetadataJob.java | 1 + .../actor/message/JobExecutionCompleted.java | 21 -- .../hive2/actor/message/ResultInformation.java | 83 +++++ .../hive2/actor/message/ResultNotReady.java | 40 +++ .../view/hive2/actor/message/ResultReady.java | 27 +- .../view/hive2/actor/message/RunStatement.java | 73 ++++ .../hive2/actor/message/SQLStatementJob.java | 65 ++++ .../actor/message/StartLogAggregation.java | 15 +- .../view/hive2/actor/message/SyncJob.java | 27 -- .../view/hive2/actor/message/job/CancelJob.java | 40 +++ .../actor/message/job/ExecuteNextStatement.java | 22 ++ .../actor/message/job/UpdateYarnAtsGuid.java | 38 ++ .../view/hive2/client/AsyncJobRunner.java | 15 +- .../view/hive2/client/AsyncJobRunnerImpl.java | 101 +++--- .../view/hive2/client/ConnectionConfig.java | 5 + .../view/hive2/client/DDLDelegatorImpl.java | 8 +- .../ambari/view/hive2/internal/Either.java | 59 ++-- .../view/hive2/resources/jobs/Aggregator.java | 7 - .../view/hive2/resources/jobs/JobService.java | 8 +- .../jobs/ResultsPaginationController.java | 6 +- .../jobs/viewJobs/JobControllerImpl.java | 13 +- .../utils/ResultFetchFormattedException.java | 27 ++ .../utils/ResultNotReadyFormattedException.java | 27 ++ .../ui/hive-web/app/controllers/index.js | 14 +- .../ui/hive-web/app/controllers/upload-table.js | 4 +- .../ui/hive-web/app/services/job-progress.js | 9 + .../ambari/view/hive2/AsyncQueriesTest.java | 124 ------- .../ambari/view/hive2/InactivityTest.java | 109 ------ .../apache/ambari/view/hive2/Mocksupport.java | 94 ----- .../ambari/view/hive2/SyncQueriesTest.java | 141 -------- 49 files changed, 1218 insertions(+), 1739 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java index 918dc68..bb1fde8 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/ConnectionDelegate.java @@ -19,10 +19,7 @@ package org.apache.ambari.view.hive2; import com.google.common.base.Optional; -import org.apache.ambari.view.hive2.actor.message.DDLJob; import org.apache.ambari.view.hive2.actor.message.GetColumnMetadataJob; -import org.apache.ambari.view.hive2.actor.message.HiveJob; -import org.apache.ambari.view.hive2.internal.HiveResult; import org.apache.hive.jdbc.HiveConnection; import org.apache.hive.jdbc.HiveStatement; @@ -30,11 +27,11 @@ import java.sql.ResultSet; import java.sql.SQLException; public interface ConnectionDelegate { - Optional<ResultSet> execute(HiveConnection connection, DDLJob job) throws SQLException; - Optional<ResultSet> executeSync(HiveConnection connection, DDLJob job) throws SQLException; - Optional<ResultSet> getColumnMetadata(HiveConnection connection, GetColumnMetadataJob job) throws SQLException; - Optional<ResultSet> getCurrentResultSet(); - Optional<HiveStatement> getCurrentStatement(); + HiveStatement createStatement(HiveConnection connection) throws SQLException; + Optional<ResultSet> execute(String statement) throws SQLException; + Optional<ResultSet> execute(HiveConnection connection, String statement) throws SQLException; + ResultSet getColumnMetadata(HiveConnection connection, GetColumnMetadataJob job) throws SQLException; + void cancel() throws SQLException; void closeResultSet(); void closeStatement(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java index e8d3333..bd2b9ba 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/HiveJdbcConnectionDelegate.java @@ -19,11 +19,7 @@ package org.apache.ambari.view.hive2; import com.google.common.base.Optional; -import org.apache.ambari.view.hive2.actor.message.DDLJob; import org.apache.ambari.view.hive2.actor.message.GetColumnMetadataJob; -import org.apache.ambari.view.hive2.actor.message.HiveJob; -import org.apache.ambari.view.hive2.actor.message.job.Result; -import org.apache.ambari.view.hive2.internal.HiveResult; import org.apache.hive.jdbc.HiveConnection; import org.apache.hive.jdbc.HiveStatement; @@ -36,85 +32,51 @@ public class HiveJdbcConnectionDelegate implements ConnectionDelegate { private ResultSet currentResultSet; private HiveStatement currentStatement; - private String atsGuid; @Override - public Optional<ResultSet> execute(HiveConnection connection, DDLJob job) throws SQLException { - - try { - Statement statement = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_READ_ONLY); - currentStatement = (HiveStatement) statement; + public HiveStatement createStatement(HiveConnection connection) throws SQLException { + Statement statement = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY); + currentStatement = (HiveStatement) statement; + return currentStatement; + } - for (String syncStatement : job.getSyncStatements()) { - // we don't care about the result - // fail all if one fails - statement.execute(syncStatement); - } + @Override + public Optional<ResultSet> execute(String statement) throws SQLException { + if (currentStatement == null) { + throw new SQLException("Statement not created. Cannot execute Hive queries"); + } - HiveStatement hiveStatement = (HiveStatement) statement; - boolean result = hiveStatement.executeAsync(job.getAsyncStatement()); - atsGuid = hiveStatement.getYarnATSGuid(); - if (result) { - // query has a result set - ResultSet resultSet = hiveStatement.getResultSet(); - currentResultSet = resultSet; - Optional<ResultSet> resultSetOptional = Optional.of(resultSet); - return resultSetOptional; + boolean hasResultSet = currentStatement.execute(statement); - } + if (hasResultSet) { + ResultSet resultSet = currentStatement.getResultSet(); + currentResultSet = resultSet; + return Optional.of(resultSet); + } else { return Optional.absent(); - - } catch (SQLException e) { - // Close the statement on any error - currentStatement.close(); - throw e; } } @Override - public Optional<ResultSet> executeSync(HiveConnection connection, DDLJob job) throws SQLException { - try { - Statement statement = connection.createStatement(); - currentStatement = (HiveStatement) statement; - - boolean hasResultSet = false; - for (String syncStatement : job.getStatements()) { - // we don't care about the result - // fail all if one fails - hasResultSet = statement.execute(syncStatement); - } - - if (hasResultSet) { - ResultSet resultSet = statement.getResultSet(); - //HiveResult result = new HiveResult(resultSet); - return Optional.of(resultSet); - } else { - return Optional.absent(); - } - } catch (SQLException e) { - // Close the statement on any error - currentStatement.close(); - throw e; - } + public Optional<ResultSet> execute(HiveConnection connection, String sqlStatement) throws SQLException { + createStatement(connection); + return execute(sqlStatement); } @Override - public Optional<ResultSet> getColumnMetadata(HiveConnection connection, GetColumnMetadataJob job) throws SQLException { + public ResultSet getColumnMetadata(HiveConnection connection, GetColumnMetadataJob job) throws SQLException { DatabaseMetaData metaData = connection.getMetaData(); ResultSet resultSet = metaData.getColumns("", job.getSchemaPattern(), job.getTablePattern(), job.getColumnPattern()); currentResultSet = resultSet; - return Optional.of(resultSet); + return resultSet; } @Override - public Optional<ResultSet> getCurrentResultSet() { - return Optional.fromNullable(currentResultSet); - } - - @Override - public Optional<HiveStatement> getCurrentStatement() { - return Optional.fromNullable(currentStatement); + public void cancel() throws SQLException { + if (currentStatement != null) { + currentStatement.cancel(); + } } @Override @@ -130,7 +92,7 @@ public class HiveJdbcConnectionDelegate implements ConnectionDelegate { } @Override - public void closeStatement() { + public void closeStatement() { try { if (currentStatement != null) { currentStatement.close(); http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncJdbcConnector.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncJdbcConnector.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncJdbcConnector.java deleted file mode 100644 index 9a5992c..0000000 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncJdbcConnector.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive2.actor; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import akka.actor.Props; -import com.google.common.base.Optional; -import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.hive2.ConnectionDelegate; -import org.apache.ambari.view.hive2.actor.message.AsyncJob; -import org.apache.ambari.view.hive2.actor.message.HiveMessage; -import org.apache.ambari.view.hive2.actor.message.RegisterActor; -import org.apache.ambari.view.hive2.actor.message.ResultReady; -import org.apache.ambari.view.hive2.actor.message.StartLogAggregation; -import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed; -import org.apache.ambari.view.hive2.actor.message.lifecycle.InactivityCheck; -import org.apache.ambari.view.hive2.internal.Either; -import org.apache.ambari.view.hive2.persistence.Storage; -import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job; -import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl; -import org.apache.ambari.view.utils.hdfs.HdfsApi; -import org.apache.hive.jdbc.HiveConnection; -import org.apache.hive.jdbc.HiveStatement; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.duration.Duration; - -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.concurrent.TimeUnit; - -public class AsyncJdbcConnector extends JdbcConnector { - - private final Logger LOG = LoggerFactory.getLogger(getClass()); - - private ActorRef logAggregator = null; - private ActorRef asyncQueryExecutor = null; - private ActorRef resultSetActor = null; - - - public AsyncJdbcConnector(ViewContext viewContext, HdfsApi hdfsApi, ActorSystem system, ActorRef parent,ActorRef deathWatch, ConnectionDelegate connectionDelegate, Storage storage) { - super(viewContext, hdfsApi, system, parent,deathWatch, connectionDelegate, storage); - } - - @Override - protected void handleJobMessage(HiveMessage message) { - Object job = message.getMessage(); - if (job instanceof AsyncJob) { - LOG.debug("Executing async job " + message.toString()); - execute((AsyncJob) job); - } - } - - @Override - protected boolean isAsync() { - return true; - } - - @Override - protected void cleanUpChildren() { - if(logAggregator != null && !logAggregator.isTerminated()) { - LOG.debug("Sending poison pill to log aggregator"); - logAggregator.tell(PoisonPill.getInstance(), self()); - } - - if(asyncQueryExecutor != null && !asyncQueryExecutor.isTerminated()) { - LOG.debug("Sending poison pill to Async Query Executor"); - asyncQueryExecutor.tell(PoisonPill.getInstance(), self()); - } - - if(resultSetActor != null && !resultSetActor.isTerminated()) { - LOG.debug("Sending poison pill to Resultset Actor"); - resultSetActor.tell(PoisonPill.getInstance(), self()); - } - } - - @Override - protected void notifyFailure() { - AsyncExecutionFailed failure = new AsyncExecutionFailed(jobId,username,"Cannot connect to hive"); - parent.tell(failure, self()); - } - - private void execute(AsyncJob message) { - this.executing = true; - this.jobId = message.getJobId(); - updateJobStatus(jobId,Job.JOB_STATE_INITIALIZED); - if (connectable == null) { - notifyAndCleanUp(); - return; - } - - Optional<HiveConnection> connectionOptional = connectable.getConnection(); - if (!connectionOptional.isPresent()) { - notifyAndCleanUp(); - return; - } - - try { - Optional<ResultSet> resultSetOptional = connectionDelegate.execute(connectionOptional.get(), message); - Optional<HiveStatement> currentStatement = connectionDelegate.getCurrentStatement(); - // There should be a result set, which either has a result set, or an empty value - // for operations which do not return anything - - logAggregator = getContext().actorOf( - Props.create(LogAggregator.class, system, hdfsApi, currentStatement.get(), message.getLogFile()) - .withDispatcher("akka.actor.misc-dispatcher"), message.getJobId() + ":" +"-logAggregator" - ); - deathWatch.tell(new RegisterActor(logAggregator),self()); - - updateGuidInJob(jobId, currentStatement.get()); - updateJobStatus(jobId,Job.JOB_STATE_RUNNING); - - if (resultSetOptional.isPresent()) { - // Start a result set aggregator on the same context, a notice to the parent will kill all these as well - // tell the result holder to assign the result set for further operations - resultSetActor = getContext().actorOf(Props.create(ResultSetIterator.class, self(), - resultSetOptional.get(),storage).withDispatcher("akka.actor.result-dispatcher"), - "ResultSetActor:ResultSetIterator:JobId:"+ jobId ); - deathWatch.tell(new RegisterActor(resultSetActor),self()); - parent.tell(new ResultReady(jobId,username, Either.<ActorRef, ActorRef>left(resultSetActor)), self()); - - // Start a actor to query ATS - } else { - // Case when this is an Update/query with no results - // Wait for operation to complete and add results; - - ActorRef asyncQueryExecutor = getContext().actorOf( - Props.create(AsyncQueryExecutor.class, parent, currentStatement.get(),storage,jobId,username) - .withDispatcher("akka.actor.result-dispatcher"), - message.getJobId() + "-asyncQueryExecutor"); - deathWatch.tell(new RegisterActor(asyncQueryExecutor),self()); - parent.tell(new ResultReady(jobId,username, Either.<ActorRef, ActorRef>right(asyncQueryExecutor)), self()); - - } - // Start a actor to query log - logAggregator.tell(new StartLogAggregation(), self()); - - - } catch (SQLException e) { - // update the error on the log - AsyncExecutionFailed failure = new AsyncExecutionFailed(message.getJobId(),username, - e.getMessage(), e); - updateJobStatus(jobId,Job.JOB_STATE_ERROR); - parent.tell(failure, self()); - // Update the operation controller to write an error on the right side - // make sure we can stop the connector - executing = false; - LOG.error("Caught SQL excpetion for job-"+message,e); - - } - - // Start Inactivity timer to close the statement - this.inactivityScheduler = system.scheduler().schedule( - Duration.Zero(), Duration.create(15 * 1000, TimeUnit.MILLISECONDS), - this.self(), new InactivityCheck(), system.dispatcher(), null); - } - - private void notifyAndCleanUp() { - updateJobStatus(jobId, Job.JOB_STATE_ERROR); - notifyFailure(); - cleanUp(); - } - - private void updateJobStatus(String jobId, String jobState) { - JobImpl job = null; - try { - job = storage.load(JobImpl.class, jobId); - } catch (ItemNotFound itemNotFound) { - itemNotFound.printStackTrace(); - } - job.setStatus(jobState); - storage.store(JobImpl.class, job); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncQueryExecutor.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncQueryExecutor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncQueryExecutor.java deleted file mode 100644 index 2a1dbb3..0000000 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/AsyncQueryExecutor.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive2.actor; - -import akka.actor.ActorRef; -import org.apache.ambari.view.hive2.actor.message.ExecuteQuery; -import org.apache.ambari.view.hive2.actor.message.HiveMessage; -import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed; -import org.apache.ambari.view.hive2.actor.message.job.ExecutionFailed; -import org.apache.ambari.view.hive2.actor.message.lifecycle.CleanUp; -import org.apache.ambari.view.hive2.internal.AsyncExecutionSuccess; -import org.apache.ambari.view.hive2.persistence.Storage; -import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job; -import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.SQLException; -import java.sql.Statement; - -public class AsyncQueryExecutor extends HiveActor { - private final Logger LOG = LoggerFactory.getLogger(getClass()); - - private Statement statement; - private final Storage storage; - private final String jobId; - private final ActorRef parent; - private final String userName; - - public AsyncQueryExecutor(ActorRef parent, Statement statement, Storage storage, String jobId,String userName) { - this.statement = statement; - this.storage = storage; - this.jobId = jobId; - this.parent = parent; - this.userName = userName; - } - - @Override - public void handleMessage(HiveMessage hiveMessage) { - Object message = hiveMessage.getMessage(); - - if (message instanceof ExecuteQuery) { - executeQuery(); - } - - } - - private void executeQuery() { - JobImpl job = null; - try { - job = storage.load(JobImpl.class, jobId); - statement.getUpdateCount(); - LOG.info("Job execution successful. Setting status in db."); - job.setStatus(Job.JOB_STATE_FINISHED); - storage.store(JobImpl.class, job); - sender().tell(new AsyncExecutionSuccess(), self()); - - } catch (SQLException e) { - job.setStatus(Job.JOB_STATE_ERROR); - sender().tell(new AsyncExecutionFailed(jobId,userName, e.getMessage(), e), self()); - storage.store(JobImpl.class, job); - } catch (ItemNotFound itemNotFound) { - sender().tell(new AsyncExecutionFailed(jobId,userName, "Cannot load job", itemNotFound), self()); - } finally { - // We can clean up this connection here - parent.tell(new CleanUp(), self()); - } - - } - - -} - - - http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/GetResultHolder.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/GetResultHolder.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/GetResultHolder.java deleted file mode 100644 index c2ee5c7..0000000 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/GetResultHolder.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive2.actor; - -public class GetResultHolder { - - private String jobId; - private String userName; - - public GetResultHolder(String jobId, String userName) { - this.jobId = jobId; - this.userName = userName; - } - - - public String getJobId() { - return jobId; - } - - public String getUserName() { - return userName; - } - - @Override - public String toString() { - return "GetResultHolder{" + - "jobId='" + jobId + '\'' + - ", userName='" + userName + '\'' + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java index 7769dde..1894739 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/JdbcConnector.java @@ -19,15 +19,28 @@ package org.apache.ambari.view.hive2.actor; import akka.actor.ActorRef; -import akka.actor.ActorSystem; import akka.actor.Cancellable; import akka.actor.PoisonPill; +import akka.actor.Props; import com.google.common.base.Optional; import org.apache.ambari.view.ViewContext; import org.apache.ambari.view.hive2.ConnectionDelegate; import org.apache.ambari.view.hive2.actor.message.Connect; +import org.apache.ambari.view.hive2.actor.message.FetchError; +import org.apache.ambari.view.hive2.actor.message.FetchResult; +import org.apache.ambari.view.hive2.actor.message.GetColumnMetadataJob; +import org.apache.ambari.view.hive2.actor.message.HiveJob; import org.apache.ambari.view.hive2.actor.message.HiveMessage; -import org.apache.ambari.view.hive2.actor.message.JobExecutionCompleted; +import org.apache.ambari.view.hive2.actor.message.ResultInformation; +import org.apache.ambari.view.hive2.actor.message.ResultNotReady; +import org.apache.ambari.view.hive2.actor.message.RunStatement; +import org.apache.ambari.view.hive2.actor.message.SQLStatementJob; +import org.apache.ambari.view.hive2.actor.message.job.CancelJob; +import org.apache.ambari.view.hive2.actor.message.job.ExecuteNextStatement; +import org.apache.ambari.view.hive2.actor.message.job.ExecutionFailed; +import org.apache.ambari.view.hive2.actor.message.job.Failure; +import org.apache.ambari.view.hive2.actor.message.job.NoResult; +import org.apache.ambari.view.hive2.actor.message.job.ResultSetHolder; import org.apache.ambari.view.hive2.actor.message.lifecycle.CleanUp; import org.apache.ambari.view.hive2.actor.message.lifecycle.DestroyConnector; import org.apache.ambari.view.hive2.actor.message.lifecycle.FreeConnector; @@ -38,22 +51,28 @@ import org.apache.ambari.view.hive2.internal.Connectable; import org.apache.ambari.view.hive2.internal.ConnectionException; import org.apache.ambari.view.hive2.persistence.Storage; import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound; +import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job; import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl; import org.apache.ambari.view.hive2.utils.HiveActorConfiguration; import org.apache.ambari.view.utils.hdfs.HdfsApi; -import org.apache.hive.jdbc.HiveStatement; +import org.apache.hive.jdbc.HiveConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.Duration; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.UUID; import java.util.concurrent.TimeUnit; /** * Wraps one Jdbc connection per user, per instance. This is used to delegate execute the statements and - * creates child actors to delegate the resultset extraction, YARN/ATS querying for ExecuteJob info and Log Aggregation + * creates child actors to delegate the ResultSet extraction, YARN/ATS querying for ExecuteJob info and Log Aggregation */ -public abstract class JdbcConnector extends HiveActor { +public class JdbcConnector extends HiveActor { private final Logger LOG = LoggerFactory.getLogger(getClass()); @@ -67,9 +86,7 @@ public abstract class JdbcConnector extends HiveActor { */ private static final long MAX_TERMINATION_INACTIVITY_INTERVAL = 10 * 60 * 1000; - protected final ViewContext viewContext; - protected final ActorSystem system; - protected final Storage storage; + private final Storage storage; /** * Keeps track of the timestamp when the last activity has happened. This is @@ -81,47 +98,57 @@ public abstract class JdbcConnector extends HiveActor { /** * Akka scheduler to tick at an interval to deal with inactivity of this actor */ - protected Cancellable inactivityScheduler; + private Cancellable inactivityScheduler; /** * Akka scheduler to tick at an interval to deal with the inactivity after which - * the actor should be killed and connectable should be released + * the actor should be killed and connection should be released */ - protected Cancellable terminateActorScheduler; + private Cancellable terminateActorScheduler; - protected Connectable connectable = null; - protected final ActorRef deathWatch; - protected final ConnectionDelegate connectionDelegate; - protected final ActorRef parent; - protected final HdfsApi hdfsApi; + private Connectable connectable = null; + private final ActorRef deathWatch; + private final ConnectionDelegate connectionDelegate; + private final ActorRef parent; + private ActorRef statementExecutor = null; + private final HdfsApi hdfsApi; /** * true if the actor is currently executing any job. */ - protected boolean executing = false; + private boolean executing = false; + private HiveJob.Type executionType = HiveJob.Type.SYNC; /** - * true if the currently executing job is async job. + * Returns the timeout configurations. */ - private boolean async = true; + private final HiveActorConfiguration actorConfiguration; + private String username; + private Optional<String> jobId = Optional.absent(); + private Optional<String> logFile = Optional.absent(); + private int statementsCount = 0; + + private ActorRef commandSender = null; + + private ActorRef resultSetIterator = null; + private boolean isFailure = false; + private Failure failure = null; + private boolean isCancelCalled = false; /** - * Returns the timeout configurations. + * For every execution, this will hold the statements that are left to execute */ - private final HiveActorConfiguration actorConfiguration; - protected String username; - protected String jobId; + private Queue<String> statementQueue = new ArrayDeque<>(); - public JdbcConnector(ViewContext viewContext, HdfsApi hdfsApi, ActorSystem system, ActorRef parent, ActorRef deathWatch, + public JdbcConnector(ViewContext viewContext, ActorRef parent, ActorRef deathWatch, HdfsApi hdfsApi, ConnectionDelegate connectionDelegate, Storage storage) { - this.viewContext = viewContext; this.hdfsApi = hdfsApi; - this.system = system; this.parent = parent; this.deathWatch = deathWatch; this.connectionDelegate = connectionDelegate; this.storage = storage; this.lastActivityTimestamp = System.currentTimeMillis(); + resultSetIterator = null; actorConfiguration = new HiveActorConfiguration(viewContext); } @@ -136,8 +163,6 @@ public abstract class JdbcConnector extends HiveActor { keepAlive(); } else if (message instanceof CleanUp) { cleanUp(); - } else if (message instanceof JobExecutionCompleted) { - jobExecutionCompleted(); } else { handleNonLifecycleMessage(hiveMessage); } @@ -148,19 +173,205 @@ public abstract class JdbcConnector extends HiveActor { keepAlive(); if (message instanceof Connect) { connect((Connect) message); + } else if (message instanceof SQLStatementJob) { + runStatementJob((SQLStatementJob) message); + } else if (message instanceof GetColumnMetadataJob) { + runGetMetaData((GetColumnMetadataJob) message); + } else if (message instanceof ExecuteNextStatement) { + executeNextStatement(); + } else if (message instanceof ResultInformation) { + gotResultBack((ResultInformation) message); + } else if (message instanceof CancelJob) { + cancelJob((CancelJob) message); + } else if (message instanceof FetchResult) { + fetchResult((FetchResult) message); + } else if (message instanceof FetchError) { + fetchError((FetchError) message); + } else { + unhandled(message); + } + } + + private void fetchError(FetchError message) { + if (isFailure) { + sender().tell(Optional.of(failure), self()); + return; + } + sender().tell(Optional.absent(), self()); + } + + private void fetchResult(FetchResult message) { + if (isFailure) { + sender().tell(failure, self()); + return; + } + + if (executing) { + sender().tell(new ResultNotReady(jobId.get(), username), self()); + return; + } + sender().tell(Optional.fromNullable(resultSetIterator), self()); + } + + private void cancelJob(CancelJob message) { + if (!executing || connectionDelegate == null) { + LOG.error("Cannot cancel job for user as currently the job is not running or started. JobId: {}", message.getJobId()); + return; + } + LOG.info("Cancelling job for user. JobId: {}, user: {}", message.getJobId(), username); + try { + isCancelCalled = true; + connectionDelegate.cancel(); + } catch (SQLException e) { + LOG.error("Failed to cancel job. JobId: {}. {}", message.getJobId(), e); + } + } + + private void gotResultBack(ResultInformation message) { + Optional<Failure> failureOptional = message.getFailure(); + if (failureOptional.isPresent()) { + Failure failure = failureOptional.get(); + processFailure(failure); + return; + } + if (statementQueue.size() == 0) { + // This is the last resultSet + processResult(message.getResultSet()); } else { - handleJobMessage(hiveMessage); + self().tell(new ExecuteNextStatement(), self()); + } + } + + private void processCancel() { + executing = false; + if (isAsync() && jobId.isPresent()) { + LOG.error("Job canceled by user for JobId: {}", jobId.get()); + updateJobStatus(jobId.get(), Job.JOB_STATE_CANCELED); } + } + private void processFailure(Failure failure) { + executing = false; + isFailure = true; + this.failure = failure; + if (isAsync() && jobId.isPresent()) { + if(isCancelCalled) { + processCancel(); + return; + } + updateJobStatus(jobId.get(), Job.JOB_STATE_ERROR); + } else { + // Send for sync execution + commandSender.tell(new ExecutionFailed(failure.getMessage(), failure.getError()), self()); + cleanUpWithTermination(); + } } - protected abstract void handleJobMessage(HiveMessage message); + private void processResult(Optional<ResultSet> resultSetOptional) { + executing = false; - protected abstract boolean isAsync(); + if (isAsync() && jobId.isPresent()) { + updateJobStatus(jobId.get(), Job.JOB_STATE_FINISHED); + } - protected abstract void notifyFailure(); + if (resultSetOptional.isPresent()) { + ActorRef resultSetActor = getContext().actorOf(Props.create(ResultSetIterator.class, self(), + resultSetOptional.get(), isAsync()).withDispatcher("akka.actor.result-dispatcher"), + "ResultSetIterator:" + UUID.randomUUID().toString()); + resultSetIterator = resultSetActor; + if (!isAsync()) { + commandSender.tell(new ResultSetHolder(resultSetActor), self()); + } + } else { + resultSetIterator = null; + if (!isAsync()) { + commandSender.tell(new NoResult(), self()); + } + } + } - protected abstract void cleanUpChildren(); + private void executeNextStatement() { + if (statementQueue.isEmpty()) { + jobExecutionCompleted(); + return; + } + + int index = statementsCount - statementQueue.size(); + String statement = statementQueue.poll(); + if (statementExecutor == null) { + statementExecutor = getStatementExecutor(); + } + + if (isAsync()) { + statementExecutor.tell(new RunStatement(index, statement, jobId.get(), true, logFile.get(), true), self()); + } else { + statementExecutor.tell(new RunStatement(index, statement), self()); + } + } + + private void runStatementJob(SQLStatementJob message) { + executing = true; + jobId = message.getJobId(); + logFile = message.getLogFile(); + executionType = message.getType(); + commandSender = getSender(); + + if (!checkConnection()) return; + + for (String statement : message.getStatements()) { + statementQueue.add(statement); + } + statementsCount = statementQueue.size(); + + if (isAsync() && jobId.isPresent()) { + updateJobStatus(jobId.get(), Job.JOB_STATE_RUNNING); + startInactivityScheduler(); + } + self().tell(new ExecuteNextStatement(), self()); + } + + public boolean checkConnection() { + if (connectable == null) { + notifyConnectFailure(); + return false; + } + + Optional<HiveConnection> connectionOptional = connectable.getConnection(); + if (!connectionOptional.isPresent()) { + notifyConnectFailure(); + return false; + } + return true; + } + + private void runGetMetaData(GetColumnMetadataJob message) { + if (!checkConnection()) return; + executing = true; + executionType = message.getType(); + commandSender = getSender(); + statementExecutor = getStatementExecutor(); + statementExecutor.tell(message, self()); + } + + private ActorRef getStatementExecutor() { + return getContext().actorOf(Props.create(StatementExecutor.class, hdfsApi, storage, connectable.getConnection().get(), connectionDelegate).withDispatcher("akka.actor.result-dispatcher"), "StatementExecutor"); + } + + private boolean isAsync() { + return executionType == HiveJob.Type.ASYNC; + } + + private void notifyConnectFailure() { + executing = false; + isFailure = true; + this.failure = new Failure("Cannot connect to hive", new SQLException("Cannot connect to hive")); + if (isAsync()) { + updateJobStatus(jobId.get(), Job.JOB_STATE_ERROR); + } else { + sender().tell(new ExecutionFailed("Cannot connect to hive"), ActorRef.noSender()); + cleanUpWithTermination(); + } + } private void keepAlive() { lastActivityTimestamp = System.currentTimeMillis(); @@ -173,16 +384,14 @@ public abstract class JdbcConnector extends HiveActor { this.executing = false; } - protected Optional<String> getJobId() { - return Optional.fromNullable(jobId); - } - protected Optional<String> getUsername() { return Optional.fromNullable(username); } private void connect(Connect message) { - this.username = message.getUsername(); + username = message.getUsername(); + jobId = message.getJobId(); + executionType = message.getType(); // check the connectable if (connectable == null) { connectable = message.getConnectable(); @@ -195,30 +404,25 @@ public abstract class JdbcConnector extends HiveActor { } catch (ConnectionException e) { // set up job failure // notify parent about job failure - this.notifyFailure(); - cleanUp(); + notifyConnectFailure(); return; } - - this.terminateActorScheduler = system.scheduler().schedule( - Duration.Zero(), Duration.create(60 * 1000, TimeUnit.MILLISECONDS), - this.getSelf(), new TerminateInactivityCheck(), system.dispatcher(), null); - + startTerminateInactivityScheduler(); } - protected void updateGuidInJob(String jobId, HiveStatement statement) { - String yarnAtsGuid = statement.getYarnATSGuid(); + private void updateJobStatus(String jobid, String status) { try { - JobImpl job = storage.load(JobImpl.class, jobId); - job.setGuid(yarnAtsGuid); + JobImpl job = storage.load(JobImpl.class, jobid); + job.setStatus(status); storage.store(JobImpl.class, job); } catch (ItemNotFound itemNotFound) { - // Cannot do anything if the job is not present + // Cannot do anything } } + private void checkInactivity() { - LOG.info("Inactivity check, executing status: {}", executing); + LOG.debug("Inactivity check, executing status: {}", executing); if (executing) { keepAlive(); return; @@ -233,11 +437,11 @@ public abstract class JdbcConnector extends HiveActor { private void checkTerminationInactivity() { if (!isAsync()) { // Should not terminate if job is sync. Will terminate after the job is finished. - stopTeminateInactivityScheduler(); + stopTerminateInactivityScheduler(); return; } - LOG.info("Termination check, executing status: {}", executing); + LOG.debug("Termination check, executing status: {}", executing); if (executing) { keepAlive(); return; @@ -249,27 +453,26 @@ public abstract class JdbcConnector extends HiveActor { } } - protected void cleanUp() { - if(jobId != null) { - LOG.debug("{} :: Cleaning up resources for inactivity for jobId: {}", self().path().name(), jobId); + private void cleanUp() { + if (jobId.isPresent()) { + LOG.debug("{} :: Cleaning up resources for inactivity for jobId: {}", self().path().name(), jobId.get()); } else { LOG.debug("{} ::Cleaning up resources with inactivity for Sync execution.", self().path().name()); } this.executing = false; cleanUpStatementAndResultSet(); - cleanUpChildren(); stopInactivityScheduler(); - parent.tell(new FreeConnector(username, jobId, isAsync()), self()); + parent.tell(new FreeConnector(username, jobId.orNull(), isAsync()), self()); } - protected void cleanUpWithTermination() { - LOG.debug("{} :: Cleaning up resources with inactivity for Sync execution.", self().path().name()); + private void cleanUpWithTermination() { + this.executing = false; + LOG.debug("{} :: Cleaning up resources with inactivity for execution.", self().path().name()); cleanUpStatementAndResultSet(); - cleanUpChildren(); stopInactivityScheduler(); - stopTeminateInactivityScheduler(); - parent.tell(new DestroyConnector(username, jobId, isAsync()), this.self()); + stopTerminateInactivityScheduler(); + parent.tell(new DestroyConnector(username, jobId.orNull(), isAsync()), this.self()); self().tell(PoisonPill.getInstance(), ActorRef.noSender()); } @@ -279,12 +482,27 @@ public abstract class JdbcConnector extends HiveActor { connectionDelegate.closeResultSet(); } - private void stopTeminateInactivityScheduler() { + private void startTerminateInactivityScheduler() { + this.terminateActorScheduler = getContext().system().scheduler().schedule( + Duration.Zero(), Duration.create(60 * 1000, TimeUnit.MILLISECONDS), + this.getSelf(), new TerminateInactivityCheck(), getContext().dispatcher(), null); + } + + private void stopTerminateInactivityScheduler() { if (!(terminateActorScheduler == null || terminateActorScheduler.isCancelled())) { terminateActorScheduler.cancel(); } } + private void startInactivityScheduler() { + if (inactivityScheduler != null) { + inactivityScheduler.cancel(); + } + inactivityScheduler = getContext().system().scheduler().schedule( + Duration.Zero(), Duration.create(15 * 1000, TimeUnit.MILLISECONDS), + this.self(), new InactivityCheck(), getContext().dispatcher(), null); + } + private void stopInactivityScheduler() { if (!(inactivityScheduler == null || inactivityScheduler.isCancelled())) { inactivityScheduler.cancel(); @@ -294,12 +512,10 @@ public abstract class JdbcConnector extends HiveActor { @Override public void postStop() throws Exception { stopInactivityScheduler(); - stopTeminateInactivityScheduler(); + stopTerminateInactivityScheduler(); if (connectable.isOpen()) { connectable.disconnect(); } } - - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java index 284345d..889611a 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/LogAggregator.java @@ -49,13 +49,13 @@ public class LogAggregator extends HiveActor { private final HdfsApi hdfsApi; private final HiveStatement statement; private final String logFile; - private final ActorSystem system; private Cancellable moreLogsScheduler; private ActorRef parent; + private boolean hasStartedFetching = false; + private boolean shouldFetchMore = true; - public LogAggregator(ActorSystem system, HdfsApi hdfsApi, HiveStatement statement, String logFile) { - this.system = system; + public LogAggregator(HdfsApi hdfsApi, HiveStatement statement, String logFile) { this.hdfsApi = hdfsApi; this.statement = statement; this.logFile = logFile; @@ -82,25 +82,36 @@ public class LogAggregator extends HiveActor { private void start() { parent = this.getSender(); - this.moreLogsScheduler = system.scheduler().schedule( + hasStartedFetching = false; + shouldFetchMore = true; + if (!(moreLogsScheduler == null || moreLogsScheduler.isCancelled())) { + moreLogsScheduler.cancel(); + } + this.moreLogsScheduler = getContext().system().scheduler().schedule( Duration.Zero(), Duration.create(AGGREGATION_INTERVAL, TimeUnit.MILLISECONDS), - this.getSelf(), new GetMoreLogs(), system.dispatcher(), null); + this.getSelf(), new GetMoreLogs(), getContext().dispatcher(), null); } private void getMoreLogs() throws SQLException, HdfsApiException { - if (statement.hasMoreLogs()) { - List<String> logs = statement.getQueryLog(); + List<String> logs = statement.getQueryLog(); + if (logs.size() > 0 && shouldFetchMore) { String allLogs = Joiner.on("\n").skipNulls().join(logs); HdfsUtil.putStringToFile(hdfsApi, logFile, allLogs); + if(!statement.hasMoreLogs()) { + shouldFetchMore = false; + } } else { - moreLogsScheduler.cancel(); - parent.tell(new LogAggregationFinished(), ActorRef.noSender()); + // Cancel the timer only when log fetching has been started + if(!shouldFetchMore) { + moreLogsScheduler.cancel(); + parent.tell(new LogAggregationFinished(), ActorRef.noSender()); + } } } @Override public void postStop() throws Exception { - if(moreLogsScheduler != null && !moreLogsScheduler.isCancelled()){ + if (moreLogsScheduler != null && !moreLogsScheduler.isCancelled()) { moreLogsScheduler.cancel(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java index ac62cf7..0681d55 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/OperationController.java @@ -24,23 +24,22 @@ import akka.actor.Props; import com.google.common.base.Optional; import org.apache.ambari.view.ViewContext; import org.apache.ambari.view.hive2.ConnectionDelegate; -import org.apache.ambari.view.hive2.actor.message.AdvanceCursor; -import org.apache.ambari.view.hive2.actor.message.AsyncJob; import org.apache.ambari.view.hive2.actor.message.Connect; import org.apache.ambari.view.hive2.actor.message.ExecuteJob; -import org.apache.ambari.view.hive2.actor.message.ExecuteQuery; import org.apache.ambari.view.hive2.actor.message.FetchError; import org.apache.ambari.view.hive2.actor.message.FetchResult; import org.apache.ambari.view.hive2.actor.message.HiveJob; import org.apache.ambari.view.hive2.actor.message.HiveMessage; import org.apache.ambari.view.hive2.actor.message.JobRejected; import org.apache.ambari.view.hive2.actor.message.RegisterActor; +import org.apache.ambari.view.hive2.actor.message.ResultNotReady; import org.apache.ambari.view.hive2.actor.message.ResultReady; +import org.apache.ambari.view.hive2.actor.message.SQLStatementJob; import org.apache.ambari.view.hive2.actor.message.job.AsyncExecutionFailed; +import org.apache.ambari.view.hive2.actor.message.job.CancelJob; import org.apache.ambari.view.hive2.actor.message.lifecycle.DestroyConnector; import org.apache.ambari.view.hive2.actor.message.lifecycle.FreeConnector; import org.apache.ambari.view.hive2.internal.ContextSupplier; -import org.apache.ambari.view.hive2.internal.Either; import org.apache.ambari.view.hive2.persistence.Storage; import org.apache.ambari.view.hive2.utils.LoggingOutputStream; import org.apache.ambari.view.utils.hdfs.HdfsApi; @@ -87,7 +86,7 @@ public class OperationController extends HiveActor { /** * Store the connection per user/per job which are currently working. */ - private final Map<String, Map<String, ActorRefResultContainer>> asyncBusyConnections; + private final Map<String, Map<String, ActorRef>> asyncBusyConnections; /** * Store the connection per user which will be used to execute sync jobs @@ -118,22 +117,14 @@ public class OperationController extends HiveActor { if (message instanceof ExecuteJob) { ExecuteJob job = (ExecuteJob) message; if (job.getJob().getType() == HiveJob.Type.ASYNC) { - sendJob(job.getConnect(), (AsyncJob) job.getJob()); + sendJob(job.getConnect(), (SQLStatementJob) job.getJob()); } else if (job.getJob().getType() == HiveJob.Type.SYNC) { sendSyncJob(job.getConnect(), job.getJob()); } } - if (message instanceof ResultReady) { - updateResultContainer((ResultReady) message); - } - - if(message instanceof AsyncExecutionFailed){ - updateResultContainerWithError((AsyncExecutionFailed) message); - } - - if (message instanceof GetResultHolder) { - getResultHolder((GetResultHolder) message); + if (message instanceof CancelJob) { + cancelJob((CancelJob) message); } if (message instanceof FetchResult) { @@ -153,72 +144,38 @@ public class OperationController extends HiveActor { } } - private void fetchError(FetchError message) { + private void cancelJob(CancelJob message) { String jobId = message.getJobId(); String username = message.getUsername(); - ActorRefResultContainer container = asyncBusyConnections.get(username).get(jobId); - if(container.hasError){ - sender().tell(Optional.of(container.error), self()); - return; - } - sender().tell(Optional.absent(), self()); - } - - private void updateResultContainerWithError(AsyncExecutionFailed message) { - String userName = message.getUsername(); - String jobId = message.getJobId(); - ActorRefResultContainer container = asyncBusyConnections.get(userName).get(jobId); - container.hasError = true; - container.error = message; - } - - private void getResultHolder(GetResultHolder message) { - String userName = message.getUserName(); - String jobId = message.getJobId(); - if(asyncBusyConnections.containsKey(userName) && asyncBusyConnections.get(userName).containsKey(jobId)) - sender().tell(asyncBusyConnections.get(userName).get(jobId).result, self()); - else { - Either<ActorRef, AsyncExecutionFailed> right = Either.right(new AsyncExecutionFailed(message.getJobId(),userName, "Could not find the job, maybe the pool expired")); - sender().tell(right, self()); + ActorRef actorRef = asyncBusyConnections.get(username).get(jobId); + if (actorRef != null) { + actorRef.tell(message, sender()); + } else { + LOG.error("Failed to find a running job. Cannot cancel jobId: {}.", message.getJobId()); } } - private void updateResultContainer(ResultReady message) { - // set up result actor in container + private void fetchError(FetchError message) { String jobId = message.getJobId(); String username = message.getUsername(); - Either<ActorRef, ActorRef> result = message.getResult(); - asyncBusyConnections.get(username).get(jobId).result = result; - // start processing - if(message.getResult().isRight()){ - // Query with no result sets to be returned - // execute right away - result.getRight().tell(new ExecuteQuery(),self()); + ActorRef actorRef = asyncBusyConnections.get(username).get(jobId); + if(actorRef != null) { + actorRef.tell(message, sender()); } - if(result.isLeft()){ - // There is a result set to be processed - result.getLeft().tell(new AdvanceCursor(message.getJobId()),self()); - } - } private void fetchResultActorRef(FetchResult message) { - //Gets an Either actorRef,result implementation - // and send back to the caller String username = message.getUsername(); String jobId = message.getJobId(); - ActorRefResultContainer container = asyncBusyConnections.get(username).get(jobId); - if(container.hasError){ - sender().tell(container.error,self()); - return; + ActorRef actorRef = asyncBusyConnections.get(username).get(jobId); + if (actorRef != null) { + actorRef.tell(message, sender()); } - Either<ActorRef, ActorRef> result = container.result; - sender().tell(result,self()); } - private void sendJob(Connect connect, AsyncJob job) { + private void sendJob(Connect connect, SQLStatementJob job) { String username = job.getUsername(); - String jobId = job.getJobId(); + String jobId = job.getJobId().get(); ActorRef subActor = null; // Check if there is available actors to process this subActor = getActorRefFromAsyncPool(username); @@ -232,25 +189,24 @@ public class OperationController extends HiveActor { HdfsApi hdfsApi = hdfsApiOptional.get(); subActor = system.actorOf( - Props.create(AsyncJdbcConnector.class, viewContext, hdfsApi, system, self(), - deathWatch, connectionSupplier.get(viewContext), + Props.create(JdbcConnector.class, viewContext, self(), + deathWatch, hdfsApi, connectionSupplier.get(viewContext), storageSupplier.get(viewContext)).withDispatcher("akka.actor.jdbc-connector-dispatcher"), - "jobId:" + jobId + ":-asyncjdbcConnector"); - deathWatch.tell(new RegisterActor(subActor),self()); - + "jobId:" + jobId + ":asyncjdbcConnector"); + deathWatch.tell(new RegisterActor(subActor), self()); } if (asyncBusyConnections.containsKey(username)) { - Map<String, ActorRefResultContainer> actors = asyncBusyConnections.get(username); + Map<String, ActorRef> actors = asyncBusyConnections.get(username); if (!actors.containsKey(jobId)) { - actors.put(jobId, new ActorRefResultContainer(subActor)); + actors.put(jobId, subActor); } else { // Reject this as with the same jobId one connection is already in progress. sender().tell(new JobRejected(username, jobId, "Existing job in progress with same jobId."), ActorRef.noSender()); } } else { - Map<String, ActorRefResultContainer> actors = new HashMap<>(); - actors.put(jobId, new ActorRefResultContainer(subActor)); + Map<String, ActorRef> actors = new HashMap<>(); + actors.put(jobId, subActor); asyncBusyConnections.put(username, actors); } @@ -290,19 +246,18 @@ public class OperationController extends HiveActor { if (subActor == null) { Optional<HdfsApi> hdfsApiOptional = hdfsApiSupplier.get(viewContext); - if(!hdfsApiOptional.isPresent()){ - sender().tell(new JobRejected(username, ExecuteJob.SYNC_JOB_MARKER, "Failed to connect to HDFS."), ActorRef.noSender()); - return; - } + if (!hdfsApiOptional.isPresent()) { + sender().tell(new JobRejected(username, ExecuteJob.SYNC_JOB_MARKER, "Failed to connect to HDFS."), ActorRef.noSender()); + return; + } HdfsApi hdfsApi = hdfsApiOptional.get(); subActor = system.actorOf( - Props.create(SyncJdbcConnector.class, viewContext, hdfsApi, system, self(), - deathWatch, connectionSupplier.get(viewContext), + Props.create(JdbcConnector.class, viewContext, self(), + deathWatch, hdfsApi, connectionSupplier.get(viewContext), storageSupplier.get(viewContext)).withDispatcher("akka.actor.jdbc-connector-dispatcher"), - UUID.randomUUID().toString() + ":SyncjdbcConnector" ); - deathWatch.tell(new RegisterActor(subActor),self()); - + UUID.randomUUID().toString() + ":SyncjdbcConnector"); + deathWatch.tell(new RegisterActor(subActor), self()); } if (syncBusyConnections.containsKey(username)) { @@ -315,7 +270,7 @@ public class OperationController extends HiveActor { } // Termination requires that the ref is known in case of sync jobs - subActor.tell(connect, self()); + subActor.tell(connect, sender()); subActor.tell(job, sender()); } @@ -333,7 +288,7 @@ public class OperationController extends HiveActor { } private void freeConnector(FreeConnector message) { - LOG.info("About to free connector for job {} and user {}",message.getJobId(),message.getUsername()); + LOG.info("About to free connector for job {} and user {}", message.getJobId(), message.getUsername()); ActorRef sender = getSender(); if (message.isForAsync()) { Optional<ActorRef> refOptional = removeFromAsyncBusyPool(message.getUsername(), message.getJobId()); @@ -354,8 +309,8 @@ public class OperationController extends HiveActor { } private void logMaps() { - LOG.info("Pool status"); - LoggingOutputStream out = new LoggingOutputStream(LOG, LoggingOutputStream.LogLevel.INFO); + LOG.debug("Pool status"); + LoggingOutputStream out = new LoggingOutputStream(LOG, LoggingOutputStream.LogLevel.DEBUG); MapUtils.debugPrint(new PrintStream(out), "Busy Async connections", asyncBusyConnections); MapUtils.debugPrint(new PrintStream(out), "Available Async connections", asyncAvailableConnections); MapUtils.debugPrint(new PrintStream(out), "Busy Sync connections", syncBusyConnections); @@ -378,9 +333,9 @@ public class OperationController extends HiveActor { private Optional<ActorRef> removeFromAsyncBusyPool(String username, String jobId) { ActorRef ref = null; if (asyncBusyConnections.containsKey(username)) { - Map<String, ActorRefResultContainer> actors = asyncBusyConnections.get(username); + Map<String, ActorRef> actors = asyncBusyConnections.get(username); if (actors.containsKey(jobId)) { - ref = actors.get(jobId).actorRef; + ref = actors.get(jobId); actors.remove(jobId); } } @@ -420,19 +375,6 @@ public class OperationController extends HiveActor { actors.remove(sender); } - private static class ActorRefResultContainer { - - ActorRef actorRef; - boolean hasError = false; - Either<ActorRef, ActorRef> result = Either.none(); - AsyncExecutionFailed error; - - public ActorRefResultContainer(ActorRef actorRef) { - this.actorRef = actorRef; - } - } - - } http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java index e883768..afab6c9 100644 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/ResultSetIterator.java @@ -21,23 +21,18 @@ package org.apache.ambari.view.hive2.actor; import akka.actor.ActorRef; import com.google.common.collect.Lists; import org.apache.ambari.view.hive2.actor.message.CursorReset; -import org.apache.ambari.view.hive2.actor.message.JobExecutionCompleted; -import org.apache.ambari.view.hive2.actor.message.ResetCursor; -import org.apache.ambari.view.hive2.client.ColumnDescription; -import org.apache.ambari.view.hive2.client.ColumnDescriptionShort; -import org.apache.ambari.view.hive2.client.Row; -import org.apache.ambari.view.hive2.persistence.Storage; -import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound; -import org.apache.ambari.view.hive2.resources.jobs.viewJobs.Job; -import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl; -import org.apache.ambari.view.hive2.actor.message.AdvanceCursor; import org.apache.ambari.view.hive2.actor.message.HiveMessage; +import org.apache.ambari.view.hive2.actor.message.ResetCursor; import org.apache.ambari.view.hive2.actor.message.job.FetchFailed; import org.apache.ambari.view.hive2.actor.message.job.Next; import org.apache.ambari.view.hive2.actor.message.job.NoMoreItems; import org.apache.ambari.view.hive2.actor.message.job.Result; import org.apache.ambari.view.hive2.actor.message.lifecycle.CleanUp; import org.apache.ambari.view.hive2.actor.message.lifecycle.KeepAlive; +import org.apache.ambari.view.hive2.client.ColumnDescription; +import org.apache.ambari.view.hive2.client.ColumnDescriptionShort; +import org.apache.ambari.view.hive2.client.Row; +import org.apache.ambari.view.hive2.persistence.Storage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,33 +53,26 @@ public class ResultSetIterator extends HiveActor { private List<ColumnDescription> columnDescriptions; private int columnCount; - private Storage storage; boolean async = false; - private boolean jobCompleteMessageSent = false; - - private boolean metaDataFetched = false; - public ResultSetIterator(ActorRef parent, ResultSet resultSet, int batchSize) { + public ResultSetIterator(ActorRef parent, ResultSet resultSet, int batchSize, boolean isAsync) { this.parent = parent; this.resultSet = resultSet; this.batchSize = batchSize; + this.async = isAsync; } - - public ResultSetIterator(ActorRef parent, ResultSet resultSet, Storage storage) { - this(parent, resultSet); - this.storage = storage; - this.async = true; + public ResultSetIterator(ActorRef parent, ResultSet resultSet) { + this(parent, resultSet, DEFAULT_BATCH_SIZE, true); } - public ResultSetIterator(ActorRef parent, ResultSet resultSet) { - this(parent, resultSet, DEFAULT_BATCH_SIZE); + public ResultSetIterator(ActorRef parent, ResultSet resultSet, boolean isAsync) { + this(parent, resultSet, DEFAULT_BATCH_SIZE, isAsync); } @Override void handleMessage(HiveMessage hiveMessage) { - LOG.info("Result set Iterator wil handle message {}", hiveMessage); sendKeepAlive(); Object message = hiveMessage.getMessage(); if (message instanceof Next) { @@ -97,39 +85,6 @@ public class ResultSetIterator extends HiveActor { if (message instanceof KeepAlive) { sendKeepAlive(); } - if (message instanceof AdvanceCursor) { - AdvanceCursor moveCursor = (AdvanceCursor) message; - advanceCursor(moveCursor); - } - - } - - private void advanceCursor(AdvanceCursor moveCursor) { - String jobid = moveCursor.getJob(); - try { - // Block here so that we can update the job status - resultSet.next(); - // Resetting the resultset as it needs to fetch from the beginning when the result is asked for. - resultSet.beforeFirst(); - LOG.info("Job execution successful. Setting status in db."); - updateJobStatus(jobid, Job.JOB_STATE_FINISHED); - sendJobCompleteMessageIfNotDone(); - } catch (SQLException e) { - LOG.error("Failed to reset the cursor after advancing. Setting error state in db.", e); - updateJobStatus(jobid, Job.JOB_STATE_ERROR); - sender().tell(new FetchFailed("Failed to reset the cursor after advancing", e), self()); - cleanUpResources(); - } - } - - private void updateJobStatus(String jobid, String status) { - try { - JobImpl job = storage.load(JobImpl.class, jobid); - job.setStatus(status); - storage.store(JobImpl.class, job); - } catch (ItemNotFound itemNotFound) { - // Cannot do anything - } } private void resetResultSet() { @@ -164,7 +119,6 @@ public class ResultSetIterator extends HiveActor { while (resultSet.next() && index < batchSize) { index++; rows.add(getRowFromResultSet(resultSet)); - sendJobCompleteMessageIfNotDone(); } if (index == 0) { @@ -185,13 +139,6 @@ public class ResultSetIterator extends HiveActor { } } - private void sendJobCompleteMessageIfNotDone() { - if (!jobCompleteMessageSent) { - jobCompleteMessageSent = true; - parent.tell(new JobExecutionCompleted(), self()); - } - } - private void cleanUpResources() { parent.tell(new CleanUp(), self()); } http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java new file mode 100644 index 0000000..c60f28b --- /dev/null +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/StatementExecutor.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive2.actor; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import com.google.common.base.Optional; +import org.apache.ambari.view.hive2.ConnectionDelegate; +import org.apache.ambari.view.hive2.actor.message.GetColumnMetadataJob; +import org.apache.ambari.view.hive2.actor.message.HiveMessage; +import org.apache.ambari.view.hive2.actor.message.ResultInformation; +import org.apache.ambari.view.hive2.actor.message.RunStatement; +import org.apache.ambari.view.hive2.actor.message.StartLogAggregation; +import org.apache.ambari.view.hive2.actor.message.job.Failure; +import org.apache.ambari.view.hive2.actor.message.job.UpdateYarnAtsGuid; +import org.apache.ambari.view.hive2.persistence.Storage; +import org.apache.ambari.view.utils.hdfs.HdfsApi; +import org.apache.hive.jdbc.HiveConnection; +import org.apache.hive.jdbc.HiveStatement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.UUID; + +/** + * Executes a single statement and returns the ResultSet if the statements generates ResultSet. + * Also, starts logAggregation and YarnAtsGuidFetcher if they are required. + */ +public class StatementExecutor extends HiveActor { + + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + private final HdfsApi hdfsApi; + private final HiveConnection connection; + protected final Storage storage; + private final ConnectionDelegate connectionDelegate; + private ActorRef logAggregator; + private ActorRef guidFetcher; + + + public StatementExecutor(HdfsApi hdfsApi, Storage storage, HiveConnection connection, ConnectionDelegate connectionDelegate) { + this.hdfsApi = hdfsApi; + this.storage = storage; + this.connection = connection; + this.connectionDelegate = connectionDelegate; + } + + @Override + void handleMessage(HiveMessage hiveMessage) { + Object message = hiveMessage.getMessage(); + if (message instanceof RunStatement) { + runStatement((RunStatement) message); + } else if (message instanceof GetColumnMetadataJob) { + getColumnMetaData((GetColumnMetadataJob) message); + } + } + + private void runStatement(RunStatement message) { + try { + HiveStatement statement = connectionDelegate.createStatement(connection); + if (message.shouldStartLogAggregation()) { + startLogAggregation(statement, message.getStatement(), message.getLogFile().get()); + } + + if (message.shouldStartGUIDFetch() && message.getJobId().isPresent()) { + startGUIDFetch(statement, message.getJobId().get()); + } + Optional<ResultSet> resultSetOptional = connectionDelegate.execute(message.getStatement()); + + if (resultSetOptional.isPresent()) { + sender().tell(new ResultInformation(message.getId(), resultSetOptional.get()), self()); + } else { + sender().tell(new ResultInformation(message.getId()), self()); + } + } catch (SQLException e) { + LOG.error("Failed to execute statement: {}. {}", message.getStatement(), e); + sender().tell(new ResultInformation(message.getId(), new Failure("Failed to execute statement: " + message.getStatement(), e)), self()); + } finally { + stopLogAggregation(); + stopGUIDFetch(); + } + } + + private void startGUIDFetch(HiveStatement statement, String jobId) { + if (guidFetcher == null) { + guidFetcher = getContext().actorOf(Props.create(YarnAtsGUIDFetcher.class, storage) + .withDispatcher("akka.actor.misc-dispatcher"), "YarnAtsGUIDFetcher:" + UUID.randomUUID().toString()); + } + guidFetcher.tell(new UpdateYarnAtsGuid(statement, jobId), self()); + } + + private void stopGUIDFetch() { + if (guidFetcher != null) { + getContext().stop(guidFetcher); + } + guidFetcher = null; + } + + private void startLogAggregation(HiveStatement statement, String sqlStatement, String logFile) { + if (logAggregator == null) { + logAggregator = getContext().actorOf( + Props.create(LogAggregator.class, hdfsApi, statement, logFile) + .withDispatcher("akka.actor.misc-dispatcher"), "LogAggregator:" + UUID.randomUUID().toString()); + } + logAggregator.tell(new StartLogAggregation(sqlStatement), getSelf()); + } + + private void stopLogAggregation() { + if (logAggregator != null) { + getContext().stop(logAggregator); + } + logAggregator = null; + } + + + private void getColumnMetaData(GetColumnMetadataJob message) { + try { + ResultSet resultSet = connectionDelegate.getColumnMetadata(connection, message); + sender().tell(new ResultInformation(-1, resultSet), self()); + } catch (SQLException e) { + LOG.error("Failed to get column metadata for databasePattern: {}, tablePattern: {}, ColumnPattern {}. {}", + message.getSchemaPattern(), message.getTablePattern(), message.getColumnPattern(), e); + sender().tell(new ResultInformation(-1, + new Failure("Failed to get column metadata for databasePattern: " + message.getSchemaPattern() + + ", tablePattern: " + message.getTablePattern() + ", ColumnPattern: " + message.getColumnPattern(), e)), self()); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/SyncJdbcConnector.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/SyncJdbcConnector.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/SyncJdbcConnector.java deleted file mode 100644 index a0b6eae..0000000 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/SyncJdbcConnector.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive2.actor; - -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.PoisonPill; -import akka.actor.Props; -import com.google.common.base.Optional; -import org.apache.ambari.view.ViewContext; -import org.apache.ambari.view.hive2.actor.message.RegisterActor; -import org.apache.ambari.view.hive2.persistence.Storage; -import org.apache.ambari.view.hive2.ConnectionDelegate; -import org.apache.ambari.view.hive2.actor.message.GetColumnMetadataJob; -import org.apache.ambari.view.hive2.actor.message.HiveMessage; -import org.apache.ambari.view.hive2.actor.message.SyncJob; -import org.apache.ambari.view.hive2.actor.message.job.ExecutionFailed; -import org.apache.ambari.view.hive2.actor.message.job.NoResult; -import org.apache.ambari.view.hive2.actor.message.job.ResultSetHolder; -import org.apache.ambari.view.utils.hdfs.HdfsApi; -import org.apache.hive.jdbc.HiveConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.ResultSet; -import java.sql.SQLException; - -public class SyncJdbcConnector extends JdbcConnector { - - private final Logger LOG = LoggerFactory.getLogger(getClass()); - private ActorRef resultSetActor = null; - - public SyncJdbcConnector(ViewContext viewContext, HdfsApi hdfsApi, ActorSystem system, ActorRef parent,ActorRef deathWatch, ConnectionDelegate connectionDelegate, Storage storage) { - super(viewContext, hdfsApi, system, parent,deathWatch, connectionDelegate, storage); - } - - @Override - protected void handleJobMessage(HiveMessage message) { - Object job = message.getMessage(); - if(job instanceof SyncJob) { - execute((SyncJob) job); - } else if (job instanceof GetColumnMetadataJob) { - getColumnMetaData((GetColumnMetadataJob) job); - } - } - - @Override - protected boolean isAsync() { - return false; - } - - @Override - protected void cleanUpChildren() { - if(resultSetActor != null && !resultSetActor.isTerminated()) { - LOG.debug("Sending poison pill to log aggregator"); - resultSetActor.tell(PoisonPill.getInstance(), self()); - } - } - - @Override - protected void notifyFailure() { - sender().tell(new ExecutionFailed("Cannot connect to hive"), ActorRef.noSender()); - } - - protected void execute(final SyncJob job) { - this.executing = true; - executeJob(new Operation<SyncJob>() { - @Override - SyncJob getJob() { - return job; - } - - @Override - Optional<ResultSet> call(HiveConnection connection) throws SQLException { - return connectionDelegate.executeSync(connection, job); - } - - @Override - String notConnectedErrorMessage() { - return "Cannot execute sync job for user: " + job.getUsername() + ". Not connected to Hive"; - } - - @Override - String executionFailedErrorMessage() { - return "Failed to execute Jdbc Statement"; - } - }); - } - - - private void getColumnMetaData(final GetColumnMetadataJob job) { - executeJob(new Operation<GetColumnMetadataJob>() { - - @Override - GetColumnMetadataJob getJob() { - return job; - } - - @Override - Optional<ResultSet> call(HiveConnection connection) throws SQLException { - return connectionDelegate.getColumnMetadata(connection, job); - } - - @Override - String notConnectedErrorMessage() { - return String.format("Cannot get column metadata for user: %s, schema: %s, table: %s, column: %s" + - ". Not connected to Hive", job.getUsername(), job.getSchemaPattern(), job.getTablePattern(), - job.getColumnPattern()); - } - - @Override - String executionFailedErrorMessage() { - return "Failed to execute Jdbc Statement"; - } - }); - } - - private void executeJob(Operation operation) { - ActorRef sender = this.getSender(); - String errorMessage = operation.notConnectedErrorMessage(); - if (connectable == null) { - sender.tell(new ExecutionFailed(errorMessage), ActorRef.noSender()); - cleanUp(); - return; - } - - Optional<HiveConnection> connectionOptional = connectable.getConnection(); - if (!connectionOptional.isPresent()) { - sender.tell(new ExecutionFailed(errorMessage), ActorRef.noSender()); - cleanUp(); - return; - } - - try { - Optional<ResultSet> resultSetOptional = operation.call(connectionOptional.get()); - if(resultSetOptional.isPresent()) { - ActorRef resultSetActor = getContext().actorOf(Props.create(ResultSetIterator.class, self(), - resultSetOptional.get()).withDispatcher("akka.actor.result-dispatcher")); - deathWatch.tell(new RegisterActor(resultSetActor),self()); - sender.tell(new ResultSetHolder(resultSetActor), self()); - } else { - sender.tell(new NoResult(), self()); - cleanUp(); - } - } catch (SQLException e) { - LOG.error(operation.executionFailedErrorMessage(), e); - sender.tell(new ExecutionFailed(operation.executionFailedErrorMessage(), e), self()); - cleanUp(); - } - } - - private abstract class Operation<T> { - abstract T getJob(); - abstract Optional<ResultSet> call(HiveConnection connection) throws SQLException; - abstract String notConnectedErrorMessage(); - abstract String executionFailedErrorMessage(); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java new file mode 100644 index 0000000..bd70421 --- /dev/null +++ b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsGUIDFetcher.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.view.hive2.actor; + +import org.apache.ambari.view.hive2.actor.message.HiveMessage; +import org.apache.ambari.view.hive2.actor.message.job.UpdateYarnAtsGuid; +import org.apache.ambari.view.hive2.persistence.Storage; +import org.apache.ambari.view.hive2.persistence.utils.ItemNotFound; +import org.apache.ambari.view.hive2.resources.jobs.viewJobs.JobImpl; +import org.apache.hive.jdbc.HiveStatement; +import scala.concurrent.duration.Duration; + +import java.util.concurrent.TimeUnit; + +/** + * Queries YARN/ATS time to time to fetch the status of the ExecuteJob and updates database + */ +public class YarnAtsGUIDFetcher extends HiveActor { + + private final Storage storage; + + public YarnAtsGUIDFetcher(Storage storage) { + this.storage = storage; + } + + @Override + public void handleMessage(HiveMessage hiveMessage) { + Object message = hiveMessage.getMessage(); + if(message instanceof UpdateYarnAtsGuid) { + updateGuid((UpdateYarnAtsGuid) message); + } + } + + private void updateGuid(UpdateYarnAtsGuid message) { + HiveStatement statement = message.getStatement(); + String jobId = message.getJobId(); + String yarnAtsGuid = statement.getYarnATSGuid(); + + // If ATS GUID is not yet generated, we will retry after 1 second + if(yarnAtsGuid == null) { + getContext().system().scheduler() + .scheduleOnce(Duration.create(1, TimeUnit.SECONDS), getSelf(), message, getContext().dispatcher(), null); + } else { + try { + JobImpl job = storage.load(JobImpl.class, jobId); + job.setGuid(yarnAtsGuid); + storage.store(JobImpl.class, job); + } catch (ItemNotFound itemNotFound) { + // Cannot do anything if the job is not present + } + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsParser.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsParser.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsParser.java deleted file mode 100644 index 0f918ad..0000000 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/YarnAtsParser.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive2.actor; - -import akka.actor.UntypedActor; -import org.apache.ambari.view.hive2.actor.message.HiveMessage; - -/** - * Queries YARN/ATS time to time to fetch the status of the ExecuteJob and updates database - */ -public class YarnAtsParser extends HiveActor { - @Override - public void handleMessage(HiveMessage hiveMessage) { - - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AdvanceCursor.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AdvanceCursor.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AdvanceCursor.java deleted file mode 100644 index c3e6c04..0000000 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AdvanceCursor.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive2.actor.message; - -public class AdvanceCursor { - - private String job; - - public AdvanceCursor(String job) { - this.job = job; - } - - public String getJob() { - return job; - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/60057378/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignResultSet.java ---------------------------------------------------------------------- diff --git a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignResultSet.java b/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignResultSet.java deleted file mode 100644 index fd1f26f..0000000 --- a/contrib/views/hive-next/src/main/java/org/apache/ambari/view/hive2/actor/message/AssignResultSet.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ambari.view.hive2.actor.message; - -import com.google.common.base.Optional; - -import java.sql.ResultSet; - -public class AssignResultSet { - - private Optional<ResultSet> resultSet; - - - public AssignResultSet(Optional<ResultSet> resultSet) { - this.resultSet = resultSet; - - } - - - public ResultSet getResultSet() { - return resultSet.orNull(); - } - - - @Override - public String toString() { - return "ExtractResultSet{" + - "resultSet=" + resultSet + - '}'; - } - -}
