This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch 3.0.x in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ee158080da687037c842c1fd807c0d1ea8a6e2df Author: XiaoxiangYu <hit_la...@126.com> AuthorDate: Wed Feb 26 00:38:19 2020 +0800 KYLIN-4353 Add regular check for cube state convert. --- .../stream/coordinator/coordinate/ReceiverClusterManager.java | 6 +++--- .../kylin/stream/core/storage/StreamingSegmentManager.java | 2 +- .../java/org/apache/kylin/stream/server/StreamingServer.java | 9 +++++++++ 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java index 32e5b88..95604e0 100644 --- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java +++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java @@ -71,11 +71,11 @@ import java.util.stream.Collectors; * <pre> * This class manage operation related to multi streaming receivers. They are often not atomic and maybe idempotent. * - * In a multi-step transcation, following steps should be thought twice: + * In a multi-step transaction, following steps should be thought twice: * 1. should fail fast or continue when exception thrown. * 2. should API(RPC) be synchronous or asynchronous - * 3. when transcation failed, will roll back always succeed - * 4. transcation should be idempotent so when it failed, it could be fixed by retry + * 3. when transaction failed, will roll back always succeed + * 4. transaction should be idempotent so when it failed, it could be fixed by retry * </pre> */ public class ReceiverClusterManager { diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java index 580e054..27bfd75 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java @@ -85,7 +85,7 @@ public class StreamingSegmentManager implements Closeable { * Any further long latency events that can't find a corresponding segment to serve the index, * the events will be put to a specific segment for long latency events only. * */ - private final long cubeDuration; + public final long cubeDuration; private final long maxCubeDuration; diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java index b4b4123..d857293 100644 --- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java +++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java @@ -159,10 +159,19 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis @Override public void run() { Collection<StreamingSegmentManager> segmentManagers = getAllCubeSegmentManagers(); + long curr = System.currentTimeMillis(); for (StreamingSegmentManager segmentManager : segmentManagers) { CubeInstance cubeInstance = segmentManager.getCubeInstance(); String cubeName = cubeInstance.getName(); try { + Collection<StreamingCubeSegment> activeSegments = segmentManager.getActiveSegments(); + for (StreamingCubeSegment segment : activeSegments) { + long delta = curr - segment.getLastUpdateTime(); + if (curr > segment.getDateRangeEnd() && delta > segmentManager.cubeDuration) { + logger.debug("Make {} immutable because it lastUpdate[{}] exceed wait duration.", segment.getSegmentName(), segment.getLastUpdateTime()); + segmentManager.makeSegmentImmutable(segment.getSegmentName()); + } + } RetentionPolicyInfo retentionPolicyInfo = new RetentionPolicyInfo(); String policyName = cubeInstance.getConfig().getStreamingSegmentRetentionPolicy(); Map<String, String> policyProps = cubeInstance.getConfig()