KYLIN-2707 more logs and fault-tolerant

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2ee76384
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2ee76384
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2ee76384

Branch: refs/heads/master
Commit: 2ee76384d9d2b9e432424c7439f056b9ba11b12f
Parents: 3b74cea
Author: lidongsjtu <lid...@apache.org>
Authored: Thu Sep 28 11:49:32 2017 +0800
Committer: Li Yang <liy...@apache.org>
Committed: Thu Sep 28 16:38:39 2017 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/JobInstance.java  |  5 +-
 .../engine/mr/common/JobInfoConverter.java      | 32 +++++--
 .../engine/mr/common/JobInfoConverterTest.java  | 90 ++++++++++++++++++
 .../apache/kylin/rest/service/JobService.java   | 99 +++++++++++---------
 4 files changed, 172 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/2ee76384/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java 
b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
index bbbbb94..e7c0aa0 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JobInstance.java
@@ -72,7 +72,8 @@ public class JobInstance extends RootPersistentEntity 
implements Comparable<JobI
 
     public JobStep getRunningStep() {
         for (JobStep step : this.getSteps()) {
-            if (step.getStatus().equals(JobStepStatusEnum.RUNNING) || 
step.getStatus().equals(JobStepStatusEnum.WAITING)) {
+            if (step.getStatus().equals(JobStepStatusEnum.RUNNING)
+                    || step.getStatus().equals(JobStepStatusEnum.WAITING)) {
                 return step;
             }
         }
@@ -302,7 +303,7 @@ public class JobInstance extends RootPersistentEntity 
implements Comparable<JobI
         private long execWaitTime;
 
         @JsonProperty("step_status")
-        private JobStepStatusEnum status;
+        private JobStepStatusEnum status = JobStepStatusEnum.PENDING;
 
         @JsonProperty("cmd_type")
         private JobStepCmdTypeEnum cmdType = 
JobStepCmdTypeEnum.SHELL_CMD_HADOOP;

http://git-wip-us.apache.org/repos/asf/kylin/blob/2ee76384/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
index ec5aef1..9b8400c 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
@@ -30,15 +30,32 @@ import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
-
-import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class JobInfoConverter {
+    private static final Logger logger = 
LoggerFactory.getLogger(JobInfoConverter.class);
+
+    public static JobInstance parseToJobInstanceQuietly(AbstractExecutable 
job, Map<String, Output> outputs) {
+        try {
+            return parseToJobInstance(job, outputs);
+        } catch (Exception e) {
+            logger.error("Failed to parse job instance: uuid={}", job, e);
+            return null;
+        }
+    }
+
     public static JobInstance parseToJobInstance(AbstractExecutable job, 
Map<String, Output> outputs) {
         if (job == null) {
+            logger.warn("job is null.");
             return null;
         }
-        Preconditions.checkState(job instanceof CubingJob, "illegal job type, 
id:" + job.getId());
+
+        if (!(job instanceof CubingJob)) {
+            logger.warn("illegal job type, id:" + job.getId());
+            return null;
+        }
+
         CubingJob cubeJob = (CubingJob) job;
         Output output = outputs.get(job.getId());
         final JobInstance result = new JobInstance();
@@ -54,7 +71,8 @@ public class JobInfoConverter {
         result.setExecStartTime(AbstractExecutable.getStartTime(output));
         result.setExecEndTime(AbstractExecutable.getEndTime(output));
         
result.setExecInterruptTime(AbstractExecutable.getInterruptTime(output));
-        
result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), 
result.getExecEndTime(), result.getExecInterruptTime()) / 1000);
+        
result.setDuration(AbstractExecutable.getDuration(result.getExecStartTime(), 
result.getExecEndTime(),
+                result.getExecInterruptTime()) / 1000);
         for (int i = 0; i < cubeJob.getTasks().size(); ++i) {
             AbstractExecutable task = cubeJob.getTasks().get(i);
             result.addStep(parseToJobStep(task, i, outputs.get(task.getId())));
@@ -69,7 +87,7 @@ public class JobInfoConverter {
         result.setSequenceID(i);
 
         if (stepOutput == null) {
-            result.setStatus(JobStepStatusEnum.ERROR);
+            logger.warn("Cannot found output for task: id={}", task.getId());
             return result;
         }
 
@@ -86,7 +104,9 @@ public class JobInfoConverter {
         }
         if (task instanceof MapReduceExecutable) {
             result.setExecCmd(((MapReduceExecutable) 
task).getMapReduceParams());
-            
result.setExecWaitTime(AbstractExecutable.getExtraInfoAsLong(stepOutput, 
MapReduceExecutable.MAP_REDUCE_WAIT_TIME, 0L) / 1000);
+            result.setExecWaitTime(
+                    AbstractExecutable.getExtraInfoAsLong(stepOutput, 
MapReduceExecutable.MAP_REDUCE_WAIT_TIME, 0L)
+                            / 1000);
         }
         if (task instanceof HadoopShellExecutable) {
             result.setExecCmd(((HadoopShellExecutable) task).getJobParams());

http://git-wip-us.apache.org/repos/asf/kylin/blob/2ee76384/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java
 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java
new file mode 100644
index 0000000..013ab0b
--- /dev/null
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.util.Map;
+
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.constant.JobStepStatusEnum;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.execution.Output;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class JobInfoConverterTest {
+    @Test
+    public void testParseToJobInstance() {
+        TestJob task = new TestJob();
+        JobInstance instance = 
JobInfoConverter.parseToJobInstanceQuietly(task, Maps.<String, Output> 
newHashMap());
+        // no exception thrown is expected
+        Assert.assertTrue(instance == null);
+    }
+
+    @Test
+    public void testParseToJobStep() {
+        TestJob task = new TestJob();
+        JobInstance.JobStep step = JobInfoConverter.parseToJobStep(task, 0, 
null);
+        Assert.assertEquals(step.getStatus(), JobStepStatusEnum.PENDING);
+
+        step = JobInfoConverter.parseToJobStep(task, 0, new TestOutput());
+        Assert.assertEquals(step.getStatus(), JobStepStatusEnum.FINISHED);
+    }
+
+    public static class TestJob extends CubingJob {
+        public TestJob() {
+            super();
+        }
+
+        @Override
+        protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "");
+        }
+    }
+
+    public static class TestOutput implements Output {
+
+        @Override
+        public Map<String, String> getExtra() {
+            Map<String, String> extra = Maps.newHashMap();
+            extra.put("testkey", "testval");
+            return extra;
+        }
+
+        @Override
+        public String getVerboseMsg() {
+            return null;
+        }
+
+        @Override
+        public ExecutableState getState() {
+            return ExecutableState.SUCCEED;
+        }
+
+        @Override
+        public long getLastModified() {
+            return 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2ee76384/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java 
b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index e84bc8d..d27b39a 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -162,22 +162,22 @@ public class JobService extends BasicService implements 
InitializingBean {
         Message msg = MsgPicker.getMsg();
 
         switch (status) {
-            case DISCARDED:
-                return ExecutableState.DISCARDED;
-            case ERROR:
-                return ExecutableState.ERROR;
-            case FINISHED:
-                return ExecutableState.SUCCEED;
-            case NEW:
-                return ExecutableState.READY;
-            case PENDING:
-                return ExecutableState.READY;
-            case RUNNING:
-                return ExecutableState.RUNNING;
-            case STOPPED:
-                return ExecutableState.STOPPED;
-            default:
-                throw new 
BadRequestException(String.format(msg.getILLEGAL_EXECUTABLE_STATE(), status));
+        case DISCARDED:
+            return ExecutableState.DISCARDED;
+        case ERROR:
+            return ExecutableState.ERROR;
+        case FINISHED:
+            return ExecutableState.SUCCEED;
+        case NEW:
+            return ExecutableState.READY;
+        case PENDING:
+            return ExecutableState.READY;
+        case RUNNING:
+            return ExecutableState.RUNNING;
+        case STOPPED:
+            return ExecutableState.STOPPED;
+        default:
+            throw new 
BadRequestException(String.format(msg.getILLEGAL_EXECUTABLE_STATE(), status));
         }
     }
 
@@ -185,22 +185,22 @@ public class JobService extends BasicService implements 
InitializingBean {
         Message msg = MsgPicker.getMsg();
 
         switch (timeFilter) {
-            case LAST_ONE_DAY:
-                calendar.add(Calendar.DAY_OF_MONTH, -1);
-                return calendar.getTimeInMillis();
-            case LAST_ONE_WEEK:
-                calendar.add(Calendar.WEEK_OF_MONTH, -1);
-                return calendar.getTimeInMillis();
-            case LAST_ONE_MONTH:
-                calendar.add(Calendar.MONTH, -1);
-                return calendar.getTimeInMillis();
-            case LAST_ONE_YEAR:
-                calendar.add(Calendar.YEAR, -1);
-                return calendar.getTimeInMillis();
-            case ALL:
-                return 0;
-            default:
-                throw new 
BadRequestException(String.format(msg.getILLEGAL_TIME_FILTER(), timeFilter));
+        case LAST_ONE_DAY:
+            calendar.add(Calendar.DAY_OF_MONTH, -1);
+            return calendar.getTimeInMillis();
+        case LAST_ONE_WEEK:
+            calendar.add(Calendar.WEEK_OF_MONTH, -1);
+            return calendar.getTimeInMillis();
+        case LAST_ONE_MONTH:
+            calendar.add(Calendar.MONTH, -1);
+            return calendar.getTimeInMillis();
+        case LAST_ONE_YEAR:
+            calendar.add(Calendar.YEAR, -1);
+            return calendar.getTimeInMillis();
+        case ALL:
+            return 0;
+        default:
+            throw new 
BadRequestException(String.format(msg.getILLEGAL_TIME_FILTER(), timeFilter));
         }
     }
 
@@ -218,8 +218,8 @@ public class JobService extends BasicService implements 
InitializingBean {
     }
 
     public JobInstance submitJobInternal(CubeInstance cube, TSRange tsRange, 
SegmentRange segRange, //
-                                         Map<Integer, Long> 
sourcePartitionOffsetStart, Map<Integer, Long> sourcePartitionOffsetEnd, //
-                                         CubeBuildTypeEnum buildType, boolean 
force, String submitter) throws IOException {
+            Map<Integer, Long> sourcePartitionOffsetStart, Map<Integer, Long> 
sourcePartitionOffsetEnd, //
+            CubeBuildTypeEnum buildType, boolean force, String submitter) 
throws IOException {
         Message msg = MsgPicker.getMsg();
 
         if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
@@ -233,7 +233,8 @@ public class JobService extends BasicService implements 
InitializingBean {
         try {
             if (buildType == CubeBuildTypeEnum.BUILD) {
                 ISource source = SourceFactory.getSource(cube);
-                SourcePartition src = new SourcePartition(tsRange, segRange, 
sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
+                SourcePartition src = new SourcePartition(tsRange, segRange, 
sourcePartitionOffsetStart,
+                        sourcePartitionOffsetEnd);
                 src = source.enrichSourcePartitionBeforeBuild(cube, src);
                 newSeg = getCubeManager().appendSegment(cube, src);
                 job = EngineFactory.createBatchCubingJob(newSeg, submitter);
@@ -328,7 +329,8 @@ public class JobService extends BasicService implements 
InitializingBean {
 
     public JobInstance cancelJob(JobInstance job) throws IOException {
         aclEvaluate.checkProjectOperationPermission(job);
-        if (null == job.getRelatedCube() || null == 
getCubeManager().getCube(job.getRelatedCube()) || null == 
job.getRelatedSegment()) {
+        if (null == job.getRelatedCube() || null == 
getCubeManager().getCube(job.getRelatedCube())
+                || null == job.getRelatedSegment()) {
             getExecutableManager().discardJob(job.getId());
             return job;
         }
@@ -366,8 +368,8 @@ public class JobService extends BasicService implements 
InitializingBean {
      * @return
      */
     public List<JobInstance> searchJobs(final String cubeNameSubstring, final 
String projectName,
-                                        final List<JobStatusEnum> statusList, 
final Integer limitValue, final Integer offsetValue,
-                                        final JobTimeFilterEnum timeFilter) {
+            final List<JobStatusEnum> statusList, final Integer limitValue, 
final Integer offsetValue,
+            final JobTimeFilterEnum timeFilter) {
         Integer limit = (null == limitValue) ? 30 : limitValue;
         Integer offset = (null == offsetValue) ? 0 : offsetValue;
         List<JobInstance> jobs = searchJobsByCubeName(cubeNameSubstring, 
projectName, statusList, timeFilter);
@@ -385,12 +387,12 @@ public class JobService extends BasicService implements 
InitializingBean {
     }
 
     public List<JobInstance> searchJobsByCubeName(final String 
cubeNameSubstring, final String projectName,
-                                                  final List<JobStatusEnum> 
statusList, final JobTimeFilterEnum timeFilter) {
+            final List<JobStatusEnum> statusList, final JobTimeFilterEnum 
timeFilter) {
         return innerSearchCubingJobs(cubeNameSubstring, null, projectName, 
statusList, timeFilter);
     }
 
     public List<JobInstance> searchJobsByJobName(final String jobName, final 
String projectName,
-                                                 final List<JobStatusEnum> 
statusList, final JobTimeFilterEnum timeFilter) {
+            final List<JobStatusEnum> statusList, final JobTimeFilterEnum 
timeFilter) {
         return innerSearchCubingJobs(null, jobName, projectName, statusList, 
timeFilter);
     }
 
@@ -417,14 +419,19 @@ public class JobService extends BasicService implements 
InitializingBean {
                                 .transform(new Function<CubingJob, 
JobInstance>() {
                                     @Override
                                     public JobInstance apply(CubingJob 
cubingJob) {
-                                        return 
JobInfoConverter.parseToJobInstance(cubingJob, allOutputs);
+                                        return 
JobInfoConverter.parseToJobInstanceQuietly(cubingJob, allOutputs);
+                                    }
+                                }).filter(new Predicate<JobInstance>() {
+                                    @Override
+                                    public boolean apply(@Nullable JobInstance 
input) {
+                                        return input != null;
                                     }
                                 }));
     }
 
     public List<CubingJob> innerSearchCubingJobs(final String cubeName, final 
String jobName,
-                                                 final Set<ExecutableState> 
statusList, long timeStartInMillis, long timeEndInMillis,
-                                                 final Map<String, Output> 
allOutputs, final boolean nameExactMatch, final String projectName) {
+            final Set<ExecutableState> statusList, long timeStartInMillis, 
long timeEndInMillis,
+            final Map<String, Output> allOutputs, final boolean 
nameExactMatch, final String projectName) {
         List<CubingJob> results = Lists.newArrayList(FluentIterable.from(
                 
getExecutableManager().getAllAbstractExecutables(timeStartInMillis, 
timeEndInMillis, CubingJob.class))
                 .filter(new Predicate<AbstractExecutable>() {
@@ -464,10 +471,10 @@ public class JobService extends BasicService implements 
InitializingBean {
                     public boolean apply(CubingJob executable) {
                         try {
                             Output output = allOutputs.get(executable.getId());
-                            if (output == null){
+                            if (output == null) {
                                 return false;
                             }
-                            
+
                             ExecutableState state = output.getState();
                             boolean ret = statusList.contains(state);
                             return ret;
@@ -497,7 +504,7 @@ public class JobService extends BasicService implements 
InitializingBean {
     }
 
     public List<CubingJob> listJobsByRealizationName(final String 
realizationName, final String projectName,
-                                                     final 
Set<ExecutableState> statusList) {
+            final Set<ExecutableState> statusList) {
         return innerSearchCubingJobs(realizationName, null, statusList, 0L, 
Long.MAX_VALUE,
                 getExecutableManager().getAllOutputs(), true, projectName);
     }

Reply via email to