This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 775efe62b525a7f9188c2c478c44e57ed4b8dc48 Author: Yinghao Lin <39019287+yhca...@users.noreply.github.com> AuthorDate: Sun Aug 13 10:56:53 2023 +0800 KYLIN-5788 Enhance global dict on flat table encoding stage logging & retry --- .../org/apache/kylin/common/KylinConfigBase.java | 1 + .../sql/catalyst/expressions/KapExpresssions.scala | 2 +- .../org/apache/spark/sql/udf/DictEncodeImpl.scala | 6 +- .../org/apache/spark/dict/NBucketDictionary.java | 7 +- .../apache/spark/dict/NGlobalDictHDFSStore.java | 92 +++++++++++++++++++++- .../org/apache/spark/dict/NGlobalDictMetaInfo.java | 6 +- .../org/apache/spark/dict/NGlobalDictStore.java | 2 +- .../org/apache/spark/dict/NGlobalDictionaryV2.java | 6 +- 8 files changed, 111 insertions(+), 11 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index c1dd5379e3..9bce3f735a 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -4035,4 +4035,5 @@ public abstract class KylinConfigBase implements Serializable { public boolean isRoundDecimalZero() { return Boolean.parseBoolean(getOptional("kylin.query.round-decimal-zero", FALSE)); } + } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala index 2233e201ba..74beffaae3 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/catalyst/expressions/KapExpresssions.scala @@ -402,7 +402,7 @@ case class DictEncode(left: Expression, mid: Expression, right: Expression) exte | try { | int bucketId = idx % $bucketSizeTerm; | $globalDictTerm = new org.apache.spark.dict.NGlobalDictionaryV2("$dictParamsTerm"); - | $bucketDictTerm = $globalDictTerm.loadBucketDictionary(bucketId); + | $bucketDictTerm = $globalDictTerm.loadBucketDictionary(bucketId, true); | } catch (Exception e) { | throw new RuntimeException(e); | } diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/DictEncodeImpl.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/DictEncodeImpl.scala index 8cc6e1a825..a33f29d736 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/DictEncodeImpl.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/udf/DictEncodeImpl.scala @@ -17,12 +17,12 @@ */ package org.apache.spark.sql.udf -import java.util - import org.apache.spark.TaskContext import org.apache.spark.dict.{NBucketDictionary, NGlobalDictionaryV2} import org.apache.spark.util.TaskCompletionListener +import java.util + object DictEncodeImpl { @transient val cacheBucketDict: ThreadLocal[java.util.HashMap[String, NBucketDictionary]] = @@ -43,7 +43,7 @@ object DictEncodeImpl { val encodeBucketId = partitionID % bucketSize.toInt val globalDict = new NGlobalDictionaryV2(dictParams) - val cachedBucketDict = globalDict.loadBucketDictionary(encodeBucketId) + val cachedBucketDict = globalDict.loadBucketDictionary(encodeBucketId, true) DictEncodeImpl.cacheBucketDict.get.put(dictParams, cachedBucketDict) TaskContext.get().addTaskCompletionListener(new TaskCompletionListener { override def onTaskCompletion(context: TaskContext): Unit = { diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java index 2d2522b252..e7fb062c1d 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java @@ -39,6 +39,11 @@ public class NBucketDictionary { NBucketDictionary(String baseDir, String workingDir, int bucketId, NGlobalDictMetaInfo metainfo) throws IOException { + this(baseDir, workingDir, bucketId, metainfo, false); + } + + NBucketDictionary(String baseDir, String workingDir, int bucketId, NGlobalDictMetaInfo metainfo, boolean isForColumnEncoding) + throws IOException { this.workingDir = workingDir; this.bucketId = bucketId; final NGlobalDictStore globalDictStore = NGlobalDictStoreFactory.getResourceStore(baseDir); @@ -47,7 +52,7 @@ public class NBucketDictionary { if (versions.length == 0) { this.absoluteDictMap = new Object2LongOpenHashMap<>(); } else { - this.absoluteDictMap = globalDictStore.getBucketDict(versions[versions.length - 1], metainfo, bucketId); + this.absoluteDictMap = globalDictStore.getBucketDict(versions[versions.length - 1], metainfo, bucketId, isForColumnEncoding); } this.relativeDictMap = new Object2LongOpenHashMap<>(); } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java index 37d2375739..6aa9ade586 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictHDFSStore.java @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; - import org.apache.hadoop.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -160,14 +159,21 @@ public class NGlobalDictHDFSStore implements NGlobalDictStore { } } - @Override - public Object2LongMap<String> getBucketDict(long version, NGlobalDictMetaInfo metaInfo, int bucketId) + public Object2LongMap<String> getBucketDict(long version, NGlobalDictMetaInfo metaInfo, int bucketId, boolean isForColumnEncoding) throws IOException { Object2LongMap<String> object2IntMap = new Object2LongOpenHashMap<>(); Path versionDir = getVersionDir(version); FileStatus[] bucketFiles = fileSystem.listStatus(versionDir, path -> path.getName().endsWith("_" + bucketId)); + // https://olapio.atlassian.net/browse/AL-8865 + // Only do check and retry when isForColumnEncoding is true + bucketFiles = checkAndRetryGetBucketFiles(metaInfo, versionDir, bucketId, isForColumnEncoding, bucketFiles); + + // Log detailed bucket files to confirm that we do not lose + // necessary files when do flat table column encoding. + logBucketFiles(metaInfo, bucketId, isForColumnEncoding, bucketFiles); + for (FileStatus file : bucketFiles) { if (file.getPath().getName().startsWith(DICT_CURR_PREFIX)) { object2IntMap.putAll(getBucketDict(file.getPath(), metaInfo.getOffset(bucketId))); @@ -180,6 +186,84 @@ public class NGlobalDictHDFSStore implements NGlobalDictStore { return object2IntMap; } + private FileStatus[] checkAndRetryGetBucketFiles(NGlobalDictMetaInfo metaInfo, Path versionDir, int bucketId, boolean isForColumnEncoding, FileStatus[] bucketFiles) throws IOException { + logger.info("[bucketId:{}][isForColumnEncoding:{}]", bucketId, isForColumnEncoding); + + if (bucketFiles.length == 0 && metaInfo.isEmptyDict()) { + logger.info("[bucketId:{}][isForColumnEncoding:{}] Empty dict, no bucket files check required", bucketId, isForColumnEncoding); + return bucketFiles; + } + + // isForColumnEncoding == true means at least 1 dict file should exist + if (isForColumnEncoding && bucketFiles.length == 0) { + logger.warn("[bucketId:{}][isForColumnEncoding:{}] Get bucket dict under strict mode but no bucket dict files found in dir {}", bucketId, isForColumnEncoding, versionDir); + tryOpenAndLogBucketDictFiles(versionDir, bucketId, bucketFiles); + final int MAX_RETRY = 3; + int retry = 1; + while (retry <= MAX_RETRY && bucketFiles.length == 0) { + logger.info("[bucketId:{}][retry:{}] Retry listing bucket dict files", bucketId, retry); + bucketFiles = fileSystem.listStatus(versionDir, path -> path.getName().endsWith("_" + bucketId)); + if (bucketFiles.length == 0) { + logger.warn("[bucketId:{}][retry:{}] Retry listing bucket dict files, no bucket dict files found", bucketId, retry); + } + retry++; + if (retry <= MAX_RETRY) { + sleep(); + } + } + if (bucketFiles.length == 0) { + // List bucket dict files failed + logger.error("[bucketId:{}] The necessary bucket dict files were not available", bucketId); + // Try open again for logging evidence + tryOpenAndLogBucketDictFiles(versionDir, bucketId, bucketFiles); + // Throw exception + throw new FileNotFoundException(String.format("[bucketId:%s] The necessary bucket dict files were not available", bucketId)); + } + } + return bucketFiles; + } + + private void logBucketFiles(NGlobalDictMetaInfo metaInfo, int bucketId, boolean isForColumnEncoding, FileStatus[] bucketFiles) { + if (bucketFiles.length == 0) { + if (metaInfo.isEmptyDict()) { + logger.info("[bucketId:{}][isForColumnEncoding:{}] Empty dict, no need to log bucket files", bucketId, isForColumnEncoding); + } else { + logger.info("[bucketId:{}][isForColumnEncoding:{}] Try listing bucketFiles, but no bucketFiles found", bucketId, isForColumnEncoding); + } + return; + } + logger.info("[bucketId:{}][isForColumnEncoding:{}] bucketFiles size: {}", bucketId, isForColumnEncoding, bucketFiles.length); + for (FileStatus file : bucketFiles) { + logger.info("[bucketId:{}][isForColumnEncoding:{}] Listing dict file: {}, file size: {}", bucketId, isForColumnEncoding, file.getPath(), file.getLen()); + } + } + + private void tryOpenAndLogBucketDictFiles(Path versionDir, int bucketId, FileStatus[] bucketFiles) { + if (bucketFiles.length == 0) { + openAndLog(new Path(versionDir, DICT_CURR_PREFIX + bucketId)); + openAndLog(new Path(versionDir, DICT_PREV_PREFIX + bucketId)); + } + } + + private void openAndLog(Path bucketDictFile) { + try { + // Do nothing, open and close + fileSystem.open(bucketDictFile).close(); + logger.info("Bucket dict file {} opened", bucketDictFile); + } catch (Exception e) { + logger.error(String.format("Error on opening bucket dict file: %s", bucketDictFile), e); + } + } + + private void sleep() { + try { + TimeUnit.MINUTES.sleep(2); + } catch (InterruptedException e) { + logger.error("Interrupted on sleep", e); + Thread.currentThread().interrupt(); + } + } + private Object2LongMap<String> getBucketDict(Path dictPath, long offset) throws IOException { Object2LongMap<String> object2IntMap = new Object2LongOpenHashMap<>(); try (FSDataInputStream is = fileSystem.open(dictPath)) { @@ -321,4 +405,6 @@ public class NGlobalDictHDFSStore implements NGlobalDictStore { } } } + + } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictMetaInfo.java b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictMetaInfo.java index cf0f673ff4..f7ce898a6a 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictMetaInfo.java +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictMetaInfo.java @@ -51,4 +51,8 @@ public class NGlobalDictMetaInfo implements Serializable { public void setBucketSize(int bucketSize) { this.bucketSize = bucketSize; } -} \ No newline at end of file + + public boolean isEmptyDict() { + return dictCount == 0; + } +} diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictStore.java b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictStore.java index ba4d0446fb..3a31d6dfb7 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictStore.java +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictStore.java @@ -40,7 +40,7 @@ public interface NGlobalDictStore { NGlobalDictMetaInfo getMetaInfo(long version) throws IOException; - Object2LongMap<String> getBucketDict(long version, NGlobalDictMetaInfo metadata, int bucketId) throws IOException; + Object2LongMap<String> getBucketDict(long version, NGlobalDictMetaInfo metadata, int bucketId, boolean isForColumnEncoding) throws IOException; void writeBucketCurrDict(String workingPath, int bucketId, Object2LongMap<String> openHashMap) throws IOException; diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java index 1a4f8fd3bc..8f37e43db2 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionaryV2.java @@ -72,10 +72,14 @@ public class NGlobalDictionaryV2 implements Serializable { } public NBucketDictionary loadBucketDictionary(int bucketId) throws IOException { + return loadBucketDictionary(bucketId, false); + } + + public NBucketDictionary loadBucketDictionary(int bucketId, boolean isForColumnEncoding) throws IOException { if (null == metadata) { metadata = getMetaInfo(); } - return new NBucketDictionary(baseDir, getWorkingDir(), bucketId, metadata); + return new NBucketDictionary(baseDir, getWorkingDir(), bucketId, metadata, isForColumnEncoding); } public NBucketDictionary createNewBucketDictionary() {