This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch 3.0.x
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ee158080da687037c842c1fd807c0d1ea8a6e2df
Author: XiaoxiangYu <hit_la...@126.com>
AuthorDate: Wed Feb 26 00:38:19 2020 +0800

    KYLIN-4353 Add regular check for cube state convert.
---
 .../stream/coordinator/coordinate/ReceiverClusterManager.java    | 6 +++---
 .../kylin/stream/core/storage/StreamingSegmentManager.java       | 2 +-
 .../java/org/apache/kylin/stream/server/StreamingServer.java     | 9 +++++++++
 3 files changed, 13 insertions(+), 4 deletions(-)

diff --git 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java
 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java
index 32e5b88..95604e0 100644
--- 
a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java
+++ 
b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java
@@ -71,11 +71,11 @@ import java.util.stream.Collectors;
  * <pre>
  * This class manage operation related to multi streaming receivers. They are 
often not atomic and maybe idempotent.
  *
- * In a multi-step transcation, following steps should be thought twice:
+ * In a multi-step transaction, following steps should be thought twice:
  *  1. should fail fast or continue when exception thrown.
  *  2. should API(RPC) be synchronous or asynchronous
- *  3. when transcation failed, will roll back always succeed
- *  4. transcation should be idempotent so when it failed, it could be fixed 
by retry
+ *  3. when transaction failed, will roll back always succeed
+ *  4. transaction should be idempotent so when it failed, it could be fixed 
by retry
  * </pre>
  */
 public class ReceiverClusterManager {
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
index 580e054..27bfd75 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
@@ -85,7 +85,7 @@ public class StreamingSegmentManager implements Closeable {
      * Any further long latency events that can't find a corresponding segment 
to serve the index,
      * the events will be put to a specific segment for long latency events 
only.
      * */
-    private final long cubeDuration;
+    public final long cubeDuration;
 
     private final long maxCubeDuration;
 
diff --git 
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
 
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
index b4b4123..d857293 100644
--- 
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
+++ 
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
@@ -159,10 +159,19 @@ public class StreamingServer implements 
ReplicaSetLeaderSelector.LeaderChangeLis
             @Override
             public void run() {
                 Collection<StreamingSegmentManager> segmentManagers = 
getAllCubeSegmentManagers();
+                long curr = System.currentTimeMillis();
                 for (StreamingSegmentManager segmentManager : segmentManagers) 
{
                     CubeInstance cubeInstance = 
segmentManager.getCubeInstance();
                     String cubeName = cubeInstance.getName();
                     try {
+                        Collection<StreamingCubeSegment> activeSegments = 
segmentManager.getActiveSegments();
+                        for (StreamingCubeSegment segment : activeSegments) {
+                            long delta = curr - segment.getLastUpdateTime();
+                            if (curr > segment.getDateRangeEnd() && delta > 
segmentManager.cubeDuration) {
+                                logger.debug("Make {} immutable because it 
lastUpdate[{}] exceed wait duration.", segment.getSegmentName(), 
segment.getLastUpdateTime());
+                                
segmentManager.makeSegmentImmutable(segment.getSegmentName());
+                            }
+                        }
                         RetentionPolicyInfo retentionPolicyInfo = new 
RetentionPolicyInfo();
                         String policyName = 
cubeInstance.getConfig().getStreamingSegmentRetentionPolicy();
                         Map<String, String> policyProps = 
cubeInstance.getConfig()

Reply via email to