This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 775efe62b525a7f9188c2c478c44e57ed4b8dc48
Author: Yinghao Lin <39019287+yhca...@users.noreply.github.com>
AuthorDate: Sun Aug 13 10:56:53 2023 +0800

    KYLIN-5788 Enhance global dict on flat table encoding stage logging & retry
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  1 +
 .../sql/catalyst/expressions/KapExpresssions.scala |  2 +-
 .../org/apache/spark/sql/udf/DictEncodeImpl.scala  |  6 +-
 .../org/apache/spark/dict/NBucketDictionary.java   |  7 +-
 .../apache/spark/dict/NGlobalDictHDFSStore.java    | 92 +++++++++++++++++++++-
 .../org/apache/spark/dict/NGlobalDictMetaInfo.java |  6 +-
 .../org/apache/spark/dict/NGlobalDictStore.java    |  2 +-
 .../org/apache/spark/dict/NGlobalDictionaryV2.java |  6 +-
 8 files changed, 111 insertions(+), 11 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index c1dd5379e3..9bce3f735a 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -4035,4 +4035,5 @@ public abstract class KylinConfigBase implements 
Serializable {
     public boolean isRoundDecimalZero() {
         return 
Boolean.parseBoolean(getOptional("kylin.query.round-decimal-zero", FALSE));
     }
+
 }
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala
index 2233e201ba..74beffaae3 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala
@@ -402,7 +402,7 @@ case class DictEncode(left: Expression, mid: Expression, 
right: Expression) exte
          |   try {
          |     int bucketId = idx % $bucketSizeTerm;
          |     $globalDictTerm = new 
org.apache.spark.dict.NGlobalDictionaryV2("$dictParamsTerm");
-         |     $bucketDictTerm = 
$globalDictTerm.loadBucketDictionary(bucketId);
+         |     $bucketDictTerm = 
$globalDictTerm.loadBucketDictionary(bucketId, true);
          |   } catch (Exception e) {
          |     throw new RuntimeException(e);
          |   }
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/DictEncodeImpl.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/DictEncodeImpl.scala
index 8cc6e1a825..a33f29d736 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/DictEncodeImpl.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/DictEncodeImpl.scala
@@ -17,12 +17,12 @@
  */
 package org.apache.spark.sql.udf
 
-import java.util
-
 import org.apache.spark.TaskContext
 import org.apache.spark.dict.{NBucketDictionary, NGlobalDictionaryV2}
 import org.apache.spark.util.TaskCompletionListener
 
+import java.util
+
 object DictEncodeImpl {
 
   @transient val cacheBucketDict: ThreadLocal[java.util.HashMap[String, 
NBucketDictionary]] =
@@ -43,7 +43,7 @@ object DictEncodeImpl {
     val encodeBucketId = partitionID % bucketSize.toInt
     val globalDict = new NGlobalDictionaryV2(dictParams)
 
-    val cachedBucketDict = globalDict.loadBucketDictionary(encodeBucketId)
+    val cachedBucketDict = globalDict.loadBucketDictionary(encodeBucketId, 
true)
     DictEncodeImpl.cacheBucketDict.get.put(dictParams, cachedBucketDict)
     TaskContext.get().addTaskCompletionListener(new TaskCompletionListener {
       override def onTaskCompletion(context: TaskContext): Unit = {
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
index 2d2522b252..e7fb062c1d 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
@@ -39,6 +39,11 @@ public class NBucketDictionary {
 
     NBucketDictionary(String baseDir, String workingDir, int bucketId, 
NGlobalDictMetaInfo metainfo)
             throws IOException {
+        this(baseDir, workingDir, bucketId, metainfo, false);
+    }
+
+    NBucketDictionary(String baseDir, String workingDir, int bucketId, 
NGlobalDictMetaInfo metainfo, boolean isForColumnEncoding)
+            throws IOException {
         this.workingDir = workingDir;
         this.bucketId = bucketId;
         final NGlobalDictStore globalDictStore = 
NGlobalDictStoreFactory.getResourceStore(baseDir);
@@ -47,7 +52,7 @@ public class NBucketDictionary {
         if (versions.length == 0) {
             this.absoluteDictMap = new Object2LongOpenHashMap<>();
         } else {
-            this.absoluteDictMap = 
globalDictStore.getBucketDict(versions[versions.length - 1], metainfo, 
bucketId);
+            this.absoluteDictMap = 
globalDictStore.getBucketDict(versions[versions.length - 1], metainfo, 
bucketId, isForColumnEncoding);
         }
         this.relativeDictMap = new Object2LongOpenHashMap<>();
     }
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java
index 37d2375739..6aa9ade586 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
 import org.apache.hadoop.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -160,14 +159,21 @@ public class NGlobalDictHDFSStore implements 
NGlobalDictStore {
         }
     }
 
-
     @Override
-    public Object2LongMap<String> getBucketDict(long version, 
NGlobalDictMetaInfo metaInfo, int bucketId)
+    public Object2LongMap<String> getBucketDict(long version, 
NGlobalDictMetaInfo metaInfo, int bucketId, boolean isForColumnEncoding)
             throws IOException {
         Object2LongMap<String> object2IntMap = new Object2LongOpenHashMap<>();
         Path versionDir = getVersionDir(version);
         FileStatus[] bucketFiles = fileSystem.listStatus(versionDir, path -> 
path.getName().endsWith("_" + bucketId));
 
+        // https://olapio.atlassian.net/browse/AL-8865
+        // Only do check and retry when isForColumnEncoding is true
+        bucketFiles = checkAndRetryGetBucketFiles(metaInfo, versionDir, 
bucketId, isForColumnEncoding, bucketFiles);
+
+        // Log detailed bucket files to confirm that we do not lose
+        // necessary files when do flat table column encoding.
+        logBucketFiles(metaInfo, bucketId, isForColumnEncoding, bucketFiles);
+
         for (FileStatus file : bucketFiles) {
             if (file.getPath().getName().startsWith(DICT_CURR_PREFIX)) {
                 object2IntMap.putAll(getBucketDict(file.getPath(), 
metaInfo.getOffset(bucketId)));
@@ -180,6 +186,84 @@ public class NGlobalDictHDFSStore implements 
NGlobalDictStore {
         return object2IntMap;
     }
 
+    private FileStatus[] checkAndRetryGetBucketFiles(NGlobalDictMetaInfo 
metaInfo, Path versionDir, int bucketId, boolean isForColumnEncoding, 
FileStatus[] bucketFiles) throws IOException {
+        logger.info("[bucketId:{}][isForColumnEncoding:{}]", bucketId, 
isForColumnEncoding);
+
+        if (bucketFiles.length == 0 && metaInfo.isEmptyDict()) {
+            logger.info("[bucketId:{}][isForColumnEncoding:{}] Empty dict, no 
bucket files check required", bucketId, isForColumnEncoding);
+            return bucketFiles;
+        }
+
+        // isForColumnEncoding == true means at least 1 dict file should exist
+        if (isForColumnEncoding && bucketFiles.length == 0) {
+            logger.warn("[bucketId:{}][isForColumnEncoding:{}] Get bucket dict 
under strict mode but no bucket dict files found in dir {}", bucketId, 
isForColumnEncoding, versionDir);
+            tryOpenAndLogBucketDictFiles(versionDir, bucketId, bucketFiles);
+            final int MAX_RETRY = 3;
+            int retry = 1;
+            while (retry <= MAX_RETRY && bucketFiles.length == 0) {
+                logger.info("[bucketId:{}][retry:{}] Retry listing bucket dict 
files", bucketId, retry);
+                bucketFiles = fileSystem.listStatus(versionDir, path -> 
path.getName().endsWith("_" + bucketId));
+                if (bucketFiles.length == 0) {
+                    logger.warn("[bucketId:{}][retry:{}] Retry listing bucket 
dict files, no bucket dict files found", bucketId, retry);
+                }
+                retry++;
+                if (retry <= MAX_RETRY) {
+                    sleep();
+                }
+            }
+            if (bucketFiles.length == 0) {
+                // List bucket dict files failed
+                logger.error("[bucketId:{}] The necessary bucket dict files 
were not available", bucketId);
+                // Try open again for logging evidence
+                tryOpenAndLogBucketDictFiles(versionDir, bucketId, 
bucketFiles);
+                // Throw exception
+                throw new FileNotFoundException(String.format("[bucketId:%s] 
The necessary bucket dict files were not available", bucketId));
+            }
+        }
+        return bucketFiles;
+    }
+
+    private void logBucketFiles(NGlobalDictMetaInfo metaInfo, int bucketId, 
boolean isForColumnEncoding, FileStatus[] bucketFiles) {
+        if (bucketFiles.length == 0) {
+            if (metaInfo.isEmptyDict()) {
+                logger.info("[bucketId:{}][isForColumnEncoding:{}] Empty dict, 
no need to log bucket files", bucketId, isForColumnEncoding);
+            } else {
+                logger.info("[bucketId:{}][isForColumnEncoding:{}] Try listing 
bucketFiles, but no bucketFiles found", bucketId, isForColumnEncoding);
+            }
+            return;
+        }
+        logger.info("[bucketId:{}][isForColumnEncoding:{}] bucketFiles size: 
{}", bucketId, isForColumnEncoding, bucketFiles.length);
+        for (FileStatus file : bucketFiles) {
+            logger.info("[bucketId:{}][isForColumnEncoding:{}] Listing dict 
file: {}, file size: {}", bucketId, isForColumnEncoding, file.getPath(), 
file.getLen());
+        }
+    }
+
+    private void tryOpenAndLogBucketDictFiles(Path versionDir, int bucketId, 
FileStatus[] bucketFiles) {
+        if (bucketFiles.length == 0) {
+            openAndLog(new Path(versionDir, DICT_CURR_PREFIX + bucketId));
+            openAndLog(new Path(versionDir, DICT_PREV_PREFIX + bucketId));
+        }
+    }
+
+    private void openAndLog(Path bucketDictFile) {
+        try {
+            // Do nothing, open and close
+            fileSystem.open(bucketDictFile).close();
+            logger.info("Bucket dict file {} opened", bucketDictFile);
+        } catch (Exception e) {
+            logger.error(String.format("Error on opening bucket dict file: 
%s", bucketDictFile), e);
+        }
+    }
+
+    private void sleep() {
+        try {
+            TimeUnit.MINUTES.sleep(2);
+        } catch (InterruptedException e) {
+            logger.error("Interrupted on sleep", e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
     private Object2LongMap<String> getBucketDict(Path dictPath, long offset) 
throws IOException {
         Object2LongMap<String> object2IntMap = new Object2LongOpenHashMap<>();
         try (FSDataInputStream is = fileSystem.open(dictPath)) {
@@ -321,4 +405,6 @@ public class NGlobalDictHDFSStore implements 
NGlobalDictStore {
             }
         }
     }
+
+
 }
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictMetaInfo.java
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictMetaInfo.java
index cf0f673ff4..f7ce898a6a 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictMetaInfo.java
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictMetaInfo.java
@@ -51,4 +51,8 @@ public class NGlobalDictMetaInfo implements Serializable {
     public void setBucketSize(int bucketSize) {
         this.bucketSize = bucketSize;
     }
-}
\ No newline at end of file
+
+    public boolean isEmptyDict() {
+        return dictCount == 0;
+    }
+}
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictStore.java
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictStore.java
index ba4d0446fb..3a31d6dfb7 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictStore.java
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictStore.java
@@ -40,7 +40,7 @@ public interface NGlobalDictStore {
 
     NGlobalDictMetaInfo getMetaInfo(long version) throws IOException;
 
-    Object2LongMap<String> getBucketDict(long version, NGlobalDictMetaInfo 
metadata, int bucketId) throws IOException;
+    Object2LongMap<String> getBucketDict(long version, NGlobalDictMetaInfo 
metadata, int bucketId, boolean isForColumnEncoding) throws IOException;
 
     void writeBucketCurrDict(String workingPath, int bucketId, 
Object2LongMap<String> openHashMap) throws IOException;
 
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java
index 1a4f8fd3bc..8f37e43db2 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java
@@ -72,10 +72,14 @@ public class NGlobalDictionaryV2 implements Serializable {
     }
 
     public NBucketDictionary loadBucketDictionary(int bucketId) throws 
IOException {
+        return loadBucketDictionary(bucketId, false);
+    }
+
+    public NBucketDictionary loadBucketDictionary(int bucketId, boolean 
isForColumnEncoding) throws IOException {
         if (null == metadata) {
             metadata = getMetaInfo();
         }
-        return new NBucketDictionary(baseDir, getWorkingDir(), bucketId, 
metadata);
+        return new NBucketDictionary(baseDir, getWorkingDir(), bucketId, 
metadata, isForColumnEncoding);
     }
 
     public NBucketDictionary createNewBucketDictionary() {

Reply via email to