This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8497add  [KYLIN-4964] Receiver consumer thread should be stoped while 
encounting unrecoverable error (#1622)
8497add is described below

commit 8497add0b8d04647ea8cf0ff2265af46104a88a0
Author: dixingxing <dixingx...@yeah.net>
AuthorDate: Fri May 14 10:52:59 2021 +0800

    [KYLIN-4964] Receiver consumer thread should be stoped while encounting 
unrecoverable error (#1622)
    
    * Improve error handle for streaming receiver, stop consumer thread while 
encounting unrecoverable error
    
    * KYLIN-4964 Receiver consumer thread should be stoped while encounting 
unrecoverable error
    
    Co-authored-by: dixingxing <dixingx...@autohome.com.cn>
---
 .../kylin/stream/core/consumer/StreamingConsumerChannel.java  |  6 ++++++
 .../apache/kylin/stream/core/model/stats/ConsumerStats.java   | 11 +++++++++++
 .../core/storage/columnar/ColumnarMemoryStorePersister.java   |  5 ++++-
 .../stream/core/storage/columnar/ColumnarSegmentStore.java    |  3 +++
 4 files changed, 24 insertions(+), 1 deletion(-)

diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
index 838793d..1372186 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.kylin.stream.core.exception.IllegalStorageException;
 import org.apache.kylin.stream.core.exception.StreamingException;
 import org.apache.kylin.stream.core.metrics.StreamingMetrics;
 import org.apache.kylin.stream.core.model.StreamingMessage;
@@ -115,6 +116,10 @@ public class StreamingConsumerChannel implements Runnable {
                 } catch (InterruptedException ie) {
                     logger.warn("interrupted!");
                     stopped = true;
+                } catch (IllegalStorageException ise) {
+                    logger.error("Encountering unrecoverable exception, 
stopping consumer thread! {}",
+                        ise.getMessage(), ise);
+                    throw ise;
                 } catch (Exception e) {
                     long countValue = addEventErrorCnt.incrementAndGet();
                     if (countValue % 1000 < 3) {
@@ -301,6 +306,7 @@ public class StreamingConsumerChannel implements Runnable {
         stats.setPartitionConsumeStatsMap(partitionConsumeStatsMap);
         stats.setConsumeOffsetInfo(getSourceConsumeInfo());
         stats.setConsumeLag(totalLag);
+        stats.setConsumerThreadAlive(this.consumerThread.isAlive());
         return stats;
     }
 
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ConsumerStats.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ConsumerStats.java
index ab640b1..57c9d1e 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ConsumerStats.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/stats/ConsumerStats.java
@@ -47,6 +47,9 @@ public class ConsumerStats {
     @JsonProperty("consume_lag")
     private long consumeLag;
 
+    @JsonProperty("consumer_thread_alive")
+    private boolean consumerThreadAlive;
+
     public Map<Integer, PartitionConsumeStats> getPartitionConsumeStatsMap() {
         return partitionConsumeStatsMap;
     }
@@ -102,4 +105,12 @@ public class ConsumerStats {
     public void setConsumeLag(long consumeLag) {
         this.consumeLag = consumeLag;
     }
+
+    public boolean isConsumerThreadAlive() {
+        return consumerThreadAlive;
+    }
+
+    public void setConsumerThreadAlive(boolean consumerThreadAlive) {
+        this.consumerThreadAlive = consumerThreadAlive;
+    }
 }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java
index 02a6ad6..5cd1119 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarMemoryStorePersister.java
@@ -51,6 +51,7 @@ import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.stream.core.exception.IllegalStorageException;
 import 
org.apache.kylin.stream.core.storage.columnar.ParsedStreamingCubeInfo.CuboidInfo;
 import org.apache.kylin.stream.core.storage.columnar.protocol.CuboidMetaInfo;
 import 
org.apache.kylin.stream.core.storage.columnar.protocol.DimDictionaryMetaInfo;
@@ -110,7 +111,9 @@ public class ColumnarMemoryStorePersister {
             logger.info("Finish persist memory store for cube:{} segment:{}, 
take: {}ms", cubeInstance.getName(),
                     segmentName, stopwatch.elapsed(MILLISECONDS));
         } catch (Exception e) {
-            logger.error("Error persist DataSegment.", e);
+            logger.error("Error persist DataSegment, deleteing fragment 
folder:{}", fragment.getFragmentFolder().getPath());
+            fragment.purge();
+            throw new IllegalStorageException("Error persist DataSegment : " + 
e.getMessage(), e);
         }
     }
 
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
index 85fcd2c..6598206 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
@@ -313,11 +313,14 @@ public class ColumnarSegmentStore implements 
IStreamingSegmentStore {
         String checkpointFragmentIDString = (String) checkpoint;
         FragmentId checkpointFragmentID = 
FragmentId.parse(checkpointFragmentIDString);
         List<DataSegmentFragment> fragments = getFragmentsFromFileSystem();
+        List<DataSegmentFragment> invalidFragments = Lists.newArrayList();
         for (DataSegmentFragment fragment : fragments) {
             if (fragment.getFragmentId().compareTo(checkpointFragmentID) > 0) {
                 fragment.purge();
+                invalidFragments.add(fragment);
             }
         }
+        this.fragments.removeAll(invalidFragments);
     }
 
     @Override

Reply via email to