Revert "KYLIN-1762 discard job when no stream message" This reverts commit 1108d9eeccecbccffea0b3f9049151672196c91a.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4de2c0c9 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4de2c0c9 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4de2c0c9 Branch: refs/heads/tempmaster Commit: 4de2c0c97bcf342323b620e58ea9279ec8aa5493 Parents: 87d957f Author: Hongbin Ma <mahong...@apache.org> Authored: Mon Sep 19 23:50:20 2016 +0800 Committer: Hongbin Ma <mahong...@apache.org> Committed: Mon Sep 19 23:50:20 2016 +0800 ---------------------------------------------------------------------- .../job/execution/DefaultChainedExecutable.java | 6 --- .../kylin/source/kafka/SeekOffsetStep.java | 45 +++++--------------- 2 files changed, 10 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4de2c0c9/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index 39a5f4f..753b389 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -88,7 +88,6 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai boolean allSucceed = true; boolean hasError = false; boolean hasRunning = false; - boolean hasDiscarded = false; for (Executable task : jobs) { final ExecutableState status = task.getStatus(); if (status == ExecutableState.ERROR) { @@ -100,9 +99,6 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai if (status == ExecutableState.RUNNING) { hasRunning = true; } - if (status == ExecutableState.DISCARDED) { - hasDiscarded = true; - } } if (allSucceed) { setEndTime(System.currentTimeMillis()); @@ -114,8 +110,6 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai notifyUserStatusChange(executableContext, ExecutableState.ERROR); } else if (hasRunning) { jobService.updateJobOutput(getId(), ExecutableState.RUNNING, null, null); - } else if (hasDiscarded) { - jobService.updateJobOutput(getId(), ExecutableState.DISCARDED, null, null); } else { jobService.updateJobOutput(getId(), ExecutableState.READY, null, null); } http://git-wip-us.apache.org/repos/asf/kylin/blob/4de2c0c9/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java index 479f1b8..5dca93f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java @@ -17,10 +17,6 @@ */ package org.apache.kylin.source.kafka; -import com.google.common.base.Function; -import com.google.common.collect.Collections2; -import com.google.common.collect.Maps; -import org.apache.commons.math3.util.MathUtils; import org.apache.kylin.source.kafka.util.KafkaClient; import org.apache.kylin.source.kafka.util.KafkaOffsetMapping; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -38,7 +34,6 @@ import org.apache.kylin.source.kafka.config.KafkaConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; import java.io.IOException; import java.util.List; import java.util.Map; @@ -106,39 +101,19 @@ public class SeekOffsetStep extends AbstractExecutable { } } - long totalStartOffset = 0, totalEndOffset = 0; - for (Long v : startOffsets.values()) { - totalStartOffset += v; - } - for (Long v : endOffsets.values()) { - totalEndOffset += v; - } + KafkaOffsetMapping.saveOffsetStart(segment, startOffsets); + KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets); - if (totalEndOffset > totalStartOffset) { - KafkaOffsetMapping.saveOffsetStart(segment, startOffsets); - KafkaOffsetMapping.saveOffsetEnd(segment, endOffsets); - segment.setName(CubeSegment.makeSegmentName(0, 0, totalStartOffset, totalEndOffset)); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToUpdateSegs(segment); - try { - cubeManager.updateCube(cubeBuilder); - } catch (IOException e) { - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } + segment.setName(CubeSegment.makeSegmentName(0, 0, segment.getSourceOffsetStart(), segment.getSourceOffsetEnd())); + CubeUpdate cubeBuilder = new CubeUpdate(cube); + cubeBuilder.setToUpdateSegs(segment); + try { + cubeManager.updateCube(cubeBuilder); return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); - } else { - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setToRemoveSegs(segment); - try { - cubeManager.updateCube(cubeBuilder); - } catch (IOException e) { - return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); - } - - return new ExecuteResult(ExecuteResult.State.DISCARDED, "No new message comes"); + } catch (IOException e) { + logger.error("fail to update cube segment offset", e); + return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); } - - } }