Repository: kylin Updated Branches: refs/heads/master e80b13ff1 -> b7ae177f4
Find a better way to check hadoop job status via job API Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b7ae177f Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b7ae177f Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b7ae177f Branch: refs/heads/master Commit: b7ae177f4c7f0e3add9b0ea612a6ac1c9d4d8730 Parents: e80b13f Author: kyotoYaho <nju_y...@apache.org> Authored: Wed May 4 14:48:44 2016 +0800 Committer: kyotoYaho <nju_y...@apache.org> Committed: Mon May 16 18:58:52 2016 +0800 ---------------------------------------------------------------------- .../mr/common/HadoopJobStatusChecker.java | 64 ++++++++++++++++++++ .../engine/mr/common/MapReduceExecutable.java | 18 +++--- 2 files changed, 73 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/b7ae177f/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopJobStatusChecker.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopJobStatusChecker.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopJobStatusChecker.java new file mode 100644 index 0000000..f25d41f --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/HadoopJobStatusChecker.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import org.apache.hadoop.mapreduce.Job; +import org.apache.kylin.job.constant.JobStepStatusEnum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HadoopJobStatusChecker { + + protected static final Logger logger = LoggerFactory.getLogger(HadoopJobStatusChecker.class); + + public static JobStepStatusEnum checkStatus(Job job, StringBuilder output) { + if (job == null || job.getJobID() == null) { + output.append("Skip status check with empty job id..\n"); + return JobStepStatusEnum.WAITING; + } + + JobStepStatusEnum status = null; + try { + switch (job.getStatus().getState()) { + case SUCCEEDED: + status = JobStepStatusEnum.FINISHED; + break; + case FAILED: + status = JobStepStatusEnum.ERROR; + break; + case KILLED: + status = JobStepStatusEnum.KILLED; + break; + case RUNNING: + status = JobStepStatusEnum.RUNNING; + break; + case PREP: + status = JobStepStatusEnum.WAITING; + break; + } + } catch (Exception e) { + logger.error("error check status", e); + output.append("Exception: " + e.getLocalizedMessage() + "\n"); + status = JobStepStatusEnum.ERROR; + } + + return status; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/b7ae177f/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java index 54459d7..69fd03a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java @@ -134,17 +134,17 @@ public class MapReduceExecutable extends AbstractExecutable { final StringBuilder output = new StringBuilder(); final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output); - final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig()); - if (restStatusCheckUrl == null) { - logger.error("restStatusCheckUrl is null"); - return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null"); - } - String mrJobId = hadoopCmdOutput.getMrJobId(); - boolean useKerberosAuth = context.getConfig().isGetJobStatusWithKerberos(); - HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, useKerberosAuth); +// final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig()); +// if (restStatusCheckUrl == null) { +// logger.error("restStatusCheckUrl is null"); +// return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null"); +// } +// String mrJobId = hadoopCmdOutput.getMrJobId(); +// boolean useKerberosAuth = context.getConfig().isGetJobStatusWithKerberos(); +// HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, useKerberosAuth); JobStepStatusEnum status = JobStepStatusEnum.NEW; while (!isDiscarded()) { - JobStepStatusEnum newStatus = statusChecker.checkStatus(); + JobStepStatusEnum newStatus = HadoopJobStatusChecker.checkStatus(job, output); if (status == JobStepStatusEnum.KILLED) { executableManager.updateJobOutput(getId(), ExecutableState.ERROR, Collections.<String, String> emptyMap(), "killed by admin"); return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin");