This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this 
push:
     new 914b97f  KYLIN-5011 Detect and scatter skewed data in dict encoding 
step (#1662)
914b97f is described below

commit 914b97f5cf2347030525140038d060178b93f955
Author: zhengshengjun <74281684+zhengsheng...@users.noreply.github.com>
AuthorDate: Sun Jun 27 21:34:55 2021 +0800

    KYLIN-5011 Detect and scatter skewed data in dict encoding step (#1662)
    
    Co-authored-by: Xiaoxiang Yu <x...@apache.org>
---
 .../org/apache/kylin/common/KylinConfigBase.java   | 28 ++++++++++
 examples/test_case_data/localmeta/kylin.properties |  3 +-
 .../org/apache/spark/dict/NBucketDictionary.java   | 21 +++++++-
 .../org/apache/spark/dict/NGlobalDictionary.java   |  6 ++-
 .../org/apache/spark/sql/KylinFunctions.scala      |  6 ++-
 .../catalyst/expressions/KylinExpresssions.scala   | 60 ++++++++++++++++++++-
 .../engine/spark/builder/CreateFlatTable.scala     |  5 +-
 .../engine/spark/builder/CubeTableEncoder.scala    | 61 +++++++++++++++++++---
 .../engine/spark/job/ParentSourceChooser.scala     |  2 +-
 .../engine/spark/LocalWithSparkSessionTest.java    |  2 +-
 .../engine/spark/builder/TestCreateFlatTable.scala | 10 +++-
 11 files changed, 186 insertions(+), 18 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index ce5d5e8..39f8eae 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3168,4 +3168,32 @@ public abstract class KylinConfigBase implements 
Serializable {
     public boolean rePartitionEncodedDatasetWithRowKey() {
         return 
Boolean.valueOf(getOptional("kylin.engine.spark.repartition.encoded.dataset", 
"false"));
     }
+
+    /*
+     * Detect dataset skew in dictionary encode step.
+     * */
+    public boolean detectDataSkewInDictEncodingEnabled() {
+        return 
Boolean.valueOf(getOptional("kylin.dictionary.detect.data.skew.in.encoding", 
"false"));
+    }
+
+    /*
+    * In some data skew cases, the repartition step during dictionary encoding 
will be slow.
+    * We can choose to sample from the dataset to detect skewed. This 
configuration is used to set the sample rate.
+    * */
+    public double sampleRateInEncodingSkewDetection() {
+        return 
Double.valueOf(getOptional("kylin.dictionary.detect.data.skew.sample.rate", 
"0.1"));
+    }
+
+    /*
+    * In KYLIN4, dictionaries are hashed into several buckets, column data are 
repartitioned by the same hash algorithm
+    * during encoding step too. In data skew cases, the repartition step will 
be very slow. Kylin will automatically
+    * sample from the source to detect skewed data and repartition these 
skewed data to random partitions.
+    * This configuration is used to set the skew data threshhold, valued from 
0 to 1.
+    * e.g.
+    *   if you set this value to 0.05, for each value that takes up more than 
5% percent of the total will be regarded
+    *   as skew data, as a result the skewed data will be no more than 20 
records
+    * */
+    public double skewPercentageThreshHold() {
+        return 
Double.valueOf(getOptional("kylin.dictionary.data.skew.percentage.threshhold", 
"0.05"));
+    }
 }
diff --git a/examples/test_case_data/localmeta/kylin.properties 
b/examples/test_case_data/localmeta/kylin.properties
index d17ce8c..4b015c4 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -162,4 +162,5 @@ kylin.source.jdbc.pass=
 kylin.query.auto-sparder-context=false
 
 kylin.metrics.query-cache.expire-seconds=5
-kylin.metrics.query-cache.max-entries=2
\ No newline at end of file
+kylin.metrics.query-cache.max-entries=2
+kylin.dictionary.detect.data.skew.in.encoding=true
\ No newline at end of file
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
index bf2a351..2e310ca 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NBucketDictionary.java
@@ -19,6 +19,12 @@ package org.apache.spark.dict;
 
 import java.io.IOException;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,8 +42,9 @@ public class NBucketDictionary {
     private Object2LongMap<String> absoluteDictMap;
     // Relative dictionary needs to calculate dictionary code according to 
NGlobalDictMetaInfo's bucketOffsets
     private Object2LongMap<String> relativeDictMap;
+    private Object2LongMap<String> skewedDictMap;
 
-    NBucketDictionary(String baseDir, String workingDir, int bucketId, 
NGlobalDictMetaInfo metainfo)
+    NBucketDictionary(String baseDir, String workingDir, int bucketId, 
NGlobalDictMetaInfo metainfo, String skewDictStorageFile)
             throws IOException {
         this.workingDir = workingDir;
         this.bucketId = bucketId;
@@ -49,6 +56,15 @@ public class NBucketDictionary {
             this.absoluteDictMap = 
globalDictStore.getBucketDict(versions[versions.length - 1], metainfo, 
bucketId);
         }
         this.relativeDictMap = new Object2LongOpenHashMap<>();
+        if (!StringUtils.isEmpty(skewDictStorageFile)) {
+            FileSystem fs = FileSystem.get(new Configuration());
+            if (fs.exists(new Path(skewDictStorageFile))) {
+                Kryo kryo = new Kryo();
+                Input input = new Input(fs.open(new 
Path(skewDictStorageFile)));
+                skewedDictMap = (Object2LongMap<String>) 
kryo.readClassAndObject(input);
+                input.close();
+            }
+        }
     }
 
     NBucketDictionary(String workingDir) {
@@ -72,6 +88,9 @@ public class NBucketDictionary {
     }
 
     public long encode(Object value) {
+        if (null != skewedDictMap && 
skewedDictMap.containsKey(value.toString())) {
+            return skewedDictMap.getLong(value.toString());
+        }
         return absoluteDictMap.getLong(value.toString());
     }
 
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionary.java
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionary.java
index 651387d..a74022c 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionary.java
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/dict/NGlobalDictionary.java
@@ -39,6 +39,7 @@ public class NGlobalDictionary implements Serializable {
     private String sourceTable;
     private String sourceColumn;
     private boolean isFirst = true;
+    private String skewDictStorageFile;
 
     public String getResourceDir() {
         return "/" + project + HadoopUtil.GLOBAL_DICT_STORAGE_ROOT + "/" + 
sourceTable + "/" + sourceColumn + "/";
@@ -71,13 +72,16 @@ public class NGlobalDictionary implements Serializable {
         if (metadata != null) {
             isFirst = false;
         }
+        if (dictInfo.length >= 5) {
+            skewDictStorageFile = dictInfo[4];
+        }
     }
 
     public NBucketDictionary loadBucketDictionary(int bucketId) throws 
IOException {
         if (null == metadata) {
             metadata = getMetaInfo();
         }
-        return new NBucketDictionary(baseDir, getWorkingDir(), bucketId, 
metadata);
+        return new NBucketDictionary(baseDir, getWorkingDir(), bucketId, 
metadata, skewDictStorageFile);
     }
 
     public NBucketDictionary createNewBucketDictionary() {
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
index 8e112c1..63c90dc 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/KylinFunctions.scala
@@ -24,7 +24,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.catalyst.expressions.{ApproxCountDistinctDecode, 
BinaryExpression,
   DictEncode, Expression, ExpressionInfo, ExpressionUtils, 
ImplicitCastInputTypes, In,
-  KylinAddMonths, Like, Literal, PreciseCountDistinctDecode, RoundBase, 
SplitPart, Sum0,
+  KylinAddMonths, Like, Literal, PreciseCountDistinctDecode, RoundBase, 
ScatterSkewData, SplitPart, Sum0,
   TimestampAdd, TimestampDiff, Truncate, UnaryExpression}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
 import org.apache.spark.sql.udaf.{ApproxCountDistinct, IntersectCount, 
PreciseCountDistinct}
@@ -44,6 +44,10 @@ object KylinFunctions {
     Column(DictEncode(column.expr, dictParams.expr, bucketSize.expr))
   }
 
+  def scatter_skew_data(column: Column, skewDataStorage: Column): Column = {
+    Column(ScatterSkewData(column.expr, skewDataStorage.expr))
+  }
+
   // special lit for KYLIN.
   def k_lit(literal: Any): Column = literal match {
     case c: Column => c
diff --git 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
index a1a45fa..d25896c 100644
--- 
a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
+++ 
b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/catalyst/expressions/KylinExpresssions.scala
@@ -24,7 +24,7 @@ import org.apache.spark.dict.{NBucketDictionary, 
NGlobalDictionary}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenerator, 
CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenerator, 
CodegenContext, ExprCode, FalseLiteral}
 import org.apache.spark.sql.types._
 import org.roaringbitmap.longlong.Roaring64NavigableMap
 
@@ -484,4 +484,62 @@ case class ApproxCountDistinctDecode(_left: Expression, 
_right: Expression)
   override def dataType: DataType = LongType
 
   override def prettyName: String = "approx_count_distinct_decode"
+}
+
+case class ScatterSkewData(left: Expression, right: Expression) extends 
BinaryExpression with ExpectsInputTypes {
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode) : 
ExprCode = {
+
+    val rand = ctx.addMutableState("java.util.Random", "rand")
+    val skewData = 
ctx.addMutableState("it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap",
+      "skewData")
+    val skewDataStorage = right.simpleString
+
+    val initParamsFuncName = ctx.addNewFunction(s"initParams",
+      s"""
+         | private void initParams() {
+         |   ${rand} = new java.util.Random();
+         |   com.esotericsoftware.kryo.Kryo kryo = new 
com.esotericsoftware.kryo.Kryo();
+         |   try {
+         |       org.apache.hadoop.fs.FileSystem fs = 
org.apache.hadoop.fs.FileSystem.get(new org.apache.hadoop.conf.Configuration());
+         |       if (fs.exists(new 
org.apache.hadoop.fs.Path("${skewDataStorage}"))) {
+         |           com.esotericsoftware.kryo.io.Input input = new 
com.esotericsoftware.kryo.io.Input(
+         |               fs.open(new 
org.apache.hadoop.fs.Path("${skewDataStorage}")));
+         |           ${skewData} = 
(it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap<String>) 
kryo.readClassAndObject(input);
+         |           input.close();
+         |       }
+         |   } catch (java.io.IOException e) {
+         |       throw new java.lang.RuntimeException(e);
+         |   }
+         | }
+        """.stripMargin)
+
+    val addSalt = ctx.addNewFunction(s"addSalt",
+      s"""
+         | private org.apache.spark.unsafe.types.UTF8String 
addSalt(org.apache.spark.unsafe.types.UTF8String val) {
+         |   if (null != ${skewData} && (null == val || 
${skewData}.containsKey(val.toString()))) {
+         |      return org.apache.spark.unsafe.types.UTF8String.fromString(
+         |          java.lang.Integer.toString(${rand}.nextInt()));
+         |   } else {
+         |      return val;
+         |   }
+         | }
+        """.stripMargin)
+
+    ctx.addPartitionInitializationStatement(s"$initParamsFuncName();");
+
+    val leftGen = left.genCode(ctx)
+    val rightGen = right.genCode(ctx)
+    val resultCode = s"""${ev.value} = $addSalt(${leftGen.value});"""
+
+    ev.copy(code = code"""
+        ${leftGen.code}
+        ${rightGen.code}
+        ${CodeGenerator.javaType(dataType)} ${ev.value} = 
${CodeGenerator.defaultValue(dataType)};
+        $resultCode""", isNull = FalseLiteral)
+
+  }
+
+  override def dataType: DataType = StringType
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, 
AnyDataType)
 }
\ No newline at end of file
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
index 6e8dc22..3bfd519 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CreateFlatTable.scala
@@ -36,7 +36,8 @@ import scala.collection.JavaConverters._
 class CreateFlatTable(val seg: SegmentInfo,
                       val toBuildTree: SpanningTree,
                       val ss: SparkSession,
-                      val sourceInfo: NBuildSourceInfo) extends Logging {
+                      val sourceInfo: NBuildSourceInfo,
+                      val jobId: String) extends Logging {
 
   import org.apache.kylin.engine.spark.builder.CreateFlatTable._
 
@@ -108,7 +109,7 @@ class CreateFlatTable(val seg: SegmentInfo,
     val matchedCols = filterCols(ds, encodeCols)
     var encodeDs = ds
     if (!matchedCols.isEmpty) {
-      encodeDs = CubeTableEncoder.encodeTable(ds, seg, matchedCols.asJava)
+      encodeDs = CubeTableEncoder.encodeTable(ds, seg, matchedCols.asJava, 
jobId)
     }
     encodeDs
   }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
index d2a34a8..8a5fe20 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeTableEncoder.scala
@@ -17,8 +17,13 @@
 */
 package org.apache.kylin.engine.spark.builder
 
-import java.util
+import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.Output
+import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{Path}
 
+import java.util
 import org.apache.kylin.engine.spark.builder.CubeBuilderHelper.ENCODE_SUFFIX
 import org.apache.kylin.engine.spark.job.NSparkCubingUtil._
 import org.apache.kylin.engine.spark.metadata.{ColumnDesc, SegmentInfo}
@@ -27,7 +32,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.KylinFunctions._
 import org.apache.spark.sql.functions.{col, _}
 import org.apache.spark.sql.types.StringType
-import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.{Dataset, Row, functions}
 import org.apache.spark.utils.SparkVersionUtils
 
 import scala.collection.JavaConverters._
@@ -35,7 +40,7 @@ import scala.collection.mutable._
 
 object CubeTableEncoder extends Logging {
 
-  def encodeTable(ds: Dataset[Row], seg: SegmentInfo, cols: 
util.Set[ColumnDesc]): Dataset[Row] = {
+  def encodeTable(ds: Dataset[Row], seg: SegmentInfo, cols: 
util.Set[ColumnDesc], jobId: String): Dataset[Row] = {
     if (SparkVersionUtils.isLessThanSparkVersion("2.4", true)) {
       assert(!ds.sparkSession.conf.get("spark.sql.adaptive.enabled", 
"false").toBoolean,
         "Parameter 'spark.sql.adaptive.enabled' must be false when encode 
tables.")
@@ -61,15 +66,57 @@ object CubeTableEncoder extends Logging {
         val encodeColRef = convertFromDot(ref.identity)
         val columnIndex = structType.fieldIndex(encodeColRef)
 
-        val dictParams = Array(seg.project, ref.tableAliasName, 
ref.columnName, seg.kylinconf.getHdfsWorkingDirectory)
+        var dictParams = Array(seg.project, ref.tableAliasName, 
ref.columnName, seg.kylinconf.getHdfsWorkingDirectory)
           .mkString(SEPARATOR)
         val aliasName = 
structType.apply(columnIndex).name.concat(ENCODE_SUFFIX)
-        val encodeCol = dict_encode(col(encodeColRef).cast(StringType), 
lit(dictParams), lit(bucketSize).cast(StringType)).as(aliasName)
-        val columns = partitionedDs.schema.map(ty => col(ty.name)) ++ 
Seq(encodeCol)
+        var encodeCol = dict_encode(col(encodeColRef).cast(StringType), 
lit(dictParams), lit(bucketSize).cast(StringType)).as(aliasName)
+        val columns = partitionedDs.schema.map(ty => col(ty.name))
+
+        if (seg.kylinconf.detectDataSkewInDictEncodingEnabled()) {
+          //find skewed data in dict-encoding step
+          val castEncodeColRef = col(encodeColRef).cast(StringType)
+          val sampleData = 
ds.select(castEncodeColRef).sample(seg.kylinconf.sampleRateInEncodingSkewDetection()).cache()
+          val totalCount = sampleData.count();
+          val skewDictStorage = new 
Path(seg.kylinconf.getJobTmpDir(seg.project) +
+            "/" + jobId + "/skewed_data/" + ref.identity)
+          val skewedDict = new Object2LongOpenHashMap[String]()
+          sampleData.groupBy(encodeColRef)
+            .agg(functions.count(lit(1)).alias("count_value"))
+            .filter(col("count_value") > totalCount * 
seg.kylinconf.skewPercentageThreshHold())
+            .repartition(enlargedBucketSize, castEncodeColRef)
+            .select(Seq(castEncodeColRef, encodeCol): _*)
+            .collect().foreach(row => skewedDict.put(row.getString(0), 
row.getLong(1)));
+          sampleData.unpersist()
+
+          //save skewed data dict
+          if (skewedDict.size() > 0) {
+            val kryo = new Kryo()
+            val fs = skewDictStorage.getFileSystem(new Configuration())
+            if (fs.exists(skewDictStorage)) {
+              fs.delete(skewDictStorage, true)
+            }
+            val output = new Output(fs.create(skewDictStorage))
+            kryo.writeClassAndObject(output, skewedDict)
+            output.close()
 
+            //define repartition expression: repartition skewed data to random 
partition
+            val scatterColumn = scatter_skew_data(castEncodeColRef, 
lit(skewDictStorage.toString))
+              .alias("scatter_skew_data_" + ref.columnName)
+
+            //encode cuboid table with skewed data dictionary
+            dictParams = Array(seg.project, ref.tableAliasName, 
ref.columnName, seg.kylinconf.getHdfsWorkingDirectory, skewDictStorage.toString)
+              .mkString(SEPARATOR)
+            encodeCol = dict_encode(col(encodeColRef).cast(StringType), 
lit(dictParams), lit(bucketSize).cast(StringType)).alias(aliasName)
+
+            partitionedDs = partitionedDs.select(columns ++ 
Seq(scatterColumn): _*)
+              .repartition(enlargedBucketSize, col("scatter_skew_data_" + 
ref.columnName))
+              .select(columns ++ Seq(encodeCol): _*)
+            return partitionedDs;
+          }
+        }
         partitionedDs = partitionedDs
           .repartition(enlargedBucketSize, col(encodeColRef).cast(StringType))
-          .select(columns: _*)
+          .select(columns ++ Seq(encodeCol): _*)
       }
     )
 
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
index bd54d0b..fdb04ab 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala
@@ -185,7 +185,7 @@ class ParentSourceChooser(
     //    sourceInfo.setViewFactTablePath(viewPath)
 
     //    val needJoin = 
ParentSourceChooser.needJoinLookupTables(segInfo.getModel, toBuildTree)
-    val flatTable = new CreateFlatTable(segInfo, toBuildTree, ss, sourceInfo)
+    val flatTable = new CreateFlatTable(segInfo, toBuildTree, ss, sourceInfo, 
jobId)
     val afterJoin: Dataset[Row] = flatTable.generateDataset(needEncoding, true)
     sourceInfo.setFlatTableDS(afterJoin)
 
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
 
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
index cfd84aa..ae2bb15 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
+++ 
b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
@@ -312,7 +312,7 @@ public class LocalWithSparkSessionTest extends 
LocalFileMetadataTestCase impleme
 
         CreateFlatTable flatTable = new CreateFlatTable(
                 MetadataConverter.getSegmentInfo(segment.getCubeInstance(), 
segment.getUuid(),
-                        segment.getName(), 
segment.getStorageLocationIdentifier()), null, ss, null);
+                        segment.getName(), 
segment.getStorageLocationIdentifier()), null, ss, null, 
ss.sparkContext().applicationId());
         Dataset<Row> ds = flatTable.generateDataset(false, true);
         return ds;
     }
diff --git 
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
 
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
index f8dd610..df0273c 100644
--- 
a/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
+++ 
b/kylin-spark-project/kylin-spark-engine/src/test/scala/org/apache/kylin/engine/spark/builder/TestCreateFlatTable.scala
@@ -96,9 +96,14 @@ class TestCreateFlatTable extends SparderBaseFunSuite with 
SharedSparkSession wi
     val afterJoin1 = generateFlatTable(seg1, cube, true)
     afterJoin1.collect()
 
+
     if (SPARK_VERSION.startsWith("2.4")) {
       val jobs = helper.getJobsByGroupId(groupId)
-      Assert.assertEquals(jobs.length, 15)
+      if (seg1.getConfig.detectDataSkewInDictEncodingEnabled()) {
+        Assert.assertEquals(jobs.length, 18)
+      } else {
+        Assert.assertEquals(jobs.length, 15)
+      }  
     } else if (SPARK_VERSION.startsWith("3.1")) {
       // in Spark 3.x, BroadcastExchangeExec overwrites job group ID
       val jobs = helper.getJobsByGroupId(null)
@@ -140,7 +145,8 @@ class TestCreateFlatTable extends SparderBaseFunSuite with 
SharedSparkSession wi
   private def generateFlatTable(segment: CubeSegment, cube: CubeInstance, 
needEncode: Boolean): Dataset[Row] = {
     val seg = MetadataConverter.getSegmentInfo(segment.getCubeInstance, 
segment.getUuid, segment.getName, segment.getStorageLocationIdentifier)
     val spanningTree = new 
ForestSpanningTree(JavaConversions.asJavaCollection(seg.toBuildLayouts))
-    val flatTable = new CreateFlatTable(seg, spanningTree, spark, null)
+    //for test case there is no build job id
+    val flatTable = new CreateFlatTable(seg, spanningTree, spark, null, 
spark.sparkContext.applicationId)
     val afterJoin = flatTable.generateDataset(needEncode)
     afterJoin
   }

Reply via email to