KYLIN-2707 more logs and fault-tolerant
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2ee76384 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2ee76384 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2ee76384 Branch: refs/heads/master Commit: 2ee76384d9d2b9e432424c7439f056b9ba11b12f Parents: 3b74cea Author: lidongsjtu <lid...@apache.org> Authored: Thu Sep 28 11:49:32 2017 +0800 Committer: Li Yang <liy...@apache.org> Committed: Thu Sep 28 16:38:39 2017 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/job/JobInstance.java | 5 +- .../engine/mr/common/JobInfoConverter.java | 32 +++++-- .../engine/mr/common/JobInfoConverterTest.java | 90 ++++++++++++++++++ .../apache/kylin/rest/service/JobService.java | 99 +++++++++++--------- 4 files changed, 172 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/2ee76384/core-job/src/main/java/org/apache/kylin/job/JobInstance.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java index bbbbb94..e7c0aa0 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java +++ b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java @@ -72,7 +72,8 @@ public class JobInstance extends RootPersistentEntity implements Comparable<JobI public JobStep getRunningStep() { for (JobStep step : this.getSteps()) { - if (step.getStatus().equals(JobStepStatusEnum.RUNNING) || step.getStatus().equals(JobStepStatusEnum.WAITING)) { + if (step.getStatus().equals(JobStepStatusEnum.RUNNING) + || step.getStatus().equals(JobStepStatusEnum.WAITING)) { return step; } } @@ -302,7 +303,7 @@ public class JobInstance extends RootPersistentEntity implements Comparable<JobI private long execWaitTime; @JsonProperty("step_status") - private JobStepStatusEnum status; + private JobStepStatusEnum status = JobStepStatusEnum.PENDING; @JsonProperty("cmd_type") private JobStepCmdTypeEnum cmdType = JobStepCmdTypeEnum.SHELL_CMD_HADOOP; http://git-wip-us.apache.org/repos/asf/kylin/blob/2ee76384/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java index ec5aef1..9b8400c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java @@ -30,15 +30,32 @@ import org.apache.kylin.job.constant.JobStepStatusEnum; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; - -import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class JobInfoConverter { + private static final Logger logger = LoggerFactory.getLogger(JobInfoConverter.class); + + public static JobInstance parseToJobInstanceQuietly(AbstractExecutable job, Map<String, Output> outputs) { + try { + return parseToJobInstance(job, outputs); + } catch (Exception e) { + logger.error("Failed to parse job instance: uuid={}", job, e); + return null; + } + } + public static JobInstance parseToJobInstance(AbstractExecutable job, Map<String, Output> outputs) { if (job == null) { + logger.warn("job is null."); return null; } - Preconditions.checkState(job instanceof CubingJob, "illegal job type, id:" + job.getId()); + + if (!(job instanceof CubingJob)) { + logger.warn("illegal job type, id:" + job.getId()); + return null; + } + CubingJob cubeJob = (CubingJob) job; Output output = outputs.get(job.getId()); final JobInstance result = new JobInstance(); @@ -54,7 +71,8 @@ public class JobInfoConverter { result.setExecStartTime(AbstractExecutable.getStartTime(output)); result.setExecEndTime(AbstractExecutable.getEndTime(output)); result.setExecInterruptTime(AbstractExecutable.getInterruptTime(output)); - result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), result.getExecEndTime(), result.getExecInterruptTime()) / 1000); + result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), result.getExecEndTime(), + result.getExecInterruptTime()) / 1000); for (int i = 0; i < cubeJob.getTasks().size(); ++i) { AbstractExecutable task = cubeJob.getTasks().get(i); result.addStep(parseToJobStep(task, i, outputs.get(task.getId()))); @@ -69,7 +87,7 @@ public class JobInfoConverter { result.setSequenceID(i); if (stepOutput == null) { - result.setStatus(JobStepStatusEnum.ERROR); + logger.warn("Cannot found output for task: id={}", task.getId()); return result; } @@ -86,7 +104,9 @@ public class JobInfoConverter { } if (task instanceof MapReduceExecutable) { result.setExecCmd(((MapReduceExecutable) task).getMapReduceParams()); - result.setExecWaitTime(AbstractExecutable.getExtraInfoAsLong(stepOutput, MapReduceExecutable.MAP_REDUCE_WAIT_TIME, 0L) / 1000); + result.setExecWaitTime( + AbstractExecutable.getExtraInfoAsLong(stepOutput, MapReduceExecutable.MAP_REDUCE_WAIT_TIME, 0L) + / 1000); } if (task instanceof HadoopShellExecutable) { result.setExecCmd(((HadoopShellExecutable) task).getJobParams()); http://git-wip-us.apache.org/repos/asf/kylin/blob/2ee76384/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java new file mode 100644 index 0000000..013ab0b --- /dev/null +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.common; + +import java.util.Map; + +import org.apache.kylin.engine.mr.CubingJob; +import org.apache.kylin.job.JobInstance; +import org.apache.kylin.job.constant.JobStepStatusEnum; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecutableState; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.job.execution.Output; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class JobInfoConverterTest { + @Test + public void testParseToJobInstance() { + TestJob task = new TestJob(); + JobInstance instance = JobInfoConverter.parseToJobInstanceQuietly(task, Maps.<String, Output> newHashMap()); + // no exception thrown is expected + Assert.assertTrue(instance == null); + } + + @Test + public void testParseToJobStep() { + TestJob task = new TestJob(); + JobInstance.JobStep step = JobInfoConverter.parseToJobStep(task, 0, null); + Assert.assertEquals(step.getStatus(), JobStepStatusEnum.PENDING); + + step = JobInfoConverter.parseToJobStep(task, 0, new TestOutput()); + Assert.assertEquals(step.getStatus(), JobStepStatusEnum.FINISHED); + } + + public static class TestJob extends CubingJob { + public TestJob() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + return new ExecuteResult(ExecuteResult.State.SUCCEED, ""); + } + } + + public static class TestOutput implements Output { + + @Override + public Map<String, String> getExtra() { + Map<String, String> extra = Maps.newHashMap(); + extra.put("testkey", "testval"); + return extra; + } + + @Override + public String getVerboseMsg() { + return null; + } + + @Override + public ExecutableState getState() { + return ExecutableState.SUCCEED; + } + + @Override + public long getLastModified() { + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/2ee76384/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java index e84bc8d..d27b39a 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java @@ -162,22 +162,22 @@ public class JobService extends BasicService implements InitializingBean { Message msg = MsgPicker.getMsg(); switch (status) { - case DISCARDED: - return ExecutableState.DISCARDED; - case ERROR: - return ExecutableState.ERROR; - case FINISHED: - return ExecutableState.SUCCEED; - case NEW: - return ExecutableState.READY; - case PENDING: - return ExecutableState.READY; - case RUNNING: - return ExecutableState.RUNNING; - case STOPPED: - return ExecutableState.STOPPED; - default: - throw new BadRequestException(String.format(msg.getILLEGAL_EXECUTABLE_STATE(), status)); + case DISCARDED: + return ExecutableState.DISCARDED; + case ERROR: + return ExecutableState.ERROR; + case FINISHED: + return ExecutableState.SUCCEED; + case NEW: + return ExecutableState.READY; + case PENDING: + return ExecutableState.READY; + case RUNNING: + return ExecutableState.RUNNING; + case STOPPED: + return ExecutableState.STOPPED; + default: + throw new BadRequestException(String.format(msg.getILLEGAL_EXECUTABLE_STATE(), status)); } } @@ -185,22 +185,22 @@ public class JobService extends BasicService implements InitializingBean { Message msg = MsgPicker.getMsg(); switch (timeFilter) { - case LAST_ONE_DAY: - calendar.add(Calendar.DAY_OF_MONTH, -1); - return calendar.getTimeInMillis(); - case LAST_ONE_WEEK: - calendar.add(Calendar.WEEK_OF_MONTH, -1); - return calendar.getTimeInMillis(); - case LAST_ONE_MONTH: - calendar.add(Calendar.MONTH, -1); - return calendar.getTimeInMillis(); - case LAST_ONE_YEAR: - calendar.add(Calendar.YEAR, -1); - return calendar.getTimeInMillis(); - case ALL: - return 0; - default: - throw new BadRequestException(String.format(msg.getILLEGAL_TIME_FILTER(), timeFilter)); + case LAST_ONE_DAY: + calendar.add(Calendar.DAY_OF_MONTH, -1); + return calendar.getTimeInMillis(); + case LAST_ONE_WEEK: + calendar.add(Calendar.WEEK_OF_MONTH, -1); + return calendar.getTimeInMillis(); + case LAST_ONE_MONTH: + calendar.add(Calendar.MONTH, -1); + return calendar.getTimeInMillis(); + case LAST_ONE_YEAR: + calendar.add(Calendar.YEAR, -1); + return calendar.getTimeInMillis(); + case ALL: + return 0; + default: + throw new BadRequestException(String.format(msg.getILLEGAL_TIME_FILTER(), timeFilter)); } } @@ -218,8 +218,8 @@ public class JobService extends BasicService implements InitializingBean { } public JobInstance submitJobInternal(CubeInstance cube, TSRange tsRange, SegmentRange segRange, // - Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, // - CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException { + Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, // + CubeBuildTypeEnum buildType, boolean force, String submitter) throws IOException { Message msg = MsgPicker.getMsg(); if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) { @@ -233,7 +233,8 @@ public class JobService extends BasicService implements InitializingBean { try { if (buildType == CubeBuildTypeEnum.BUILD) { ISource source = SourceFactory.getSource(cube); - SourcePartition src = new SourcePartition(tsRange, segRange, sourcePartitionOffsetStart, sourcePartitionOffsetEnd); + SourcePartition src = new SourcePartition(tsRange, segRange, sourcePartitionOffsetStart, + sourcePartitionOffsetEnd); src = source.enrichSourcePartitionBeforeBuild(cube, src); newSeg = getCubeManager().appendSegment(cube, src); job = EngineFactory.createBatchCubingJob(newSeg, submitter); @@ -328,7 +329,8 @@ public class JobService extends BasicService implements InitializingBean { public JobInstance cancelJob(JobInstance job) throws IOException { aclEvaluate.checkProjectOperationPermission(job); - if (null == job.getRelatedCube() || null == getCubeManager().getCube(job.getRelatedCube()) || null == job.getRelatedSegment()) { + if (null == job.getRelatedCube() || null == getCubeManager().getCube(job.getRelatedCube()) + || null == job.getRelatedSegment()) { getExecutableManager().discardJob(job.getId()); return job; } @@ -366,8 +368,8 @@ public class JobService extends BasicService implements InitializingBean { * @return */ public List<JobInstance> searchJobs(final String cubeNameSubstring, final String projectName, - final List<JobStatusEnum> statusList, final Integer limitValue, final Integer offsetValue, - final JobTimeFilterEnum timeFilter) { + final List<JobStatusEnum> statusList, final Integer limitValue, final Integer offsetValue, + final JobTimeFilterEnum timeFilter) { Integer limit = (null == limitValue) ? 30 : limitValue; Integer offset = (null == offsetValue) ? 0 : offsetValue; List<JobInstance> jobs = searchJobsByCubeName(cubeNameSubstring, projectName, statusList, timeFilter); @@ -385,12 +387,12 @@ public class JobService extends BasicService implements InitializingBean { } public List<JobInstance> searchJobsByCubeName(final String cubeNameSubstring, final String projectName, - final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) { + final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) { return innerSearchCubingJobs(cubeNameSubstring, null, projectName, statusList, timeFilter); } public List<JobInstance> searchJobsByJobName(final String jobName, final String projectName, - final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) { + final List<JobStatusEnum> statusList, final JobTimeFilterEnum timeFilter) { return innerSearchCubingJobs(null, jobName, projectName, statusList, timeFilter); } @@ -417,14 +419,19 @@ public class JobService extends BasicService implements InitializingBean { .transform(new Function<CubingJob, JobInstance>() { @Override public JobInstance apply(CubingJob cubingJob) { - return JobInfoConverter.parseToJobInstance(cubingJob, allOutputs); + return JobInfoConverter.parseToJobInstanceQuietly(cubingJob, allOutputs); + } + }).filter(new Predicate<JobInstance>() { + @Override + public boolean apply(@Nullable JobInstance input) { + return input != null; } })); } public List<CubingJob> innerSearchCubingJobs(final String cubeName, final String jobName, - final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis, - final Map<String, Output> allOutputs, final boolean nameExactMatch, final String projectName) { + final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis, + final Map<String, Output> allOutputs, final boolean nameExactMatch, final String projectName) { List<CubingJob> results = Lists.newArrayList(FluentIterable.from( getExecutableManager().getAllAbstractExecutables(timeStartInMillis, timeEndInMillis, CubingJob.class)) .filter(new Predicate<AbstractExecutable>() { @@ -464,10 +471,10 @@ public class JobService extends BasicService implements InitializingBean { public boolean apply(CubingJob executable) { try { Output output = allOutputs.get(executable.getId()); - if (output == null){ + if (output == null) { return false; } - + ExecutableState state = output.getState(); boolean ret = statusList.contains(state); return ret; @@ -497,7 +504,7 @@ public class JobService extends BasicService implements InitializingBean { } public List<CubingJob> listJobsByRealizationName(final String realizationName, final String projectName, - final Set<ExecutableState> statusList) { + final Set<ExecutableState> statusList) { return innerSearchCubingJobs(realizationName, null, statusList, 0L, Long.MAX_VALUE, getExecutableManager().getAllOutputs(), true, projectName); }