This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new fb94ec4 KYLIN-4818 Check lookup table duplicate key when building job fb94ec4 is described below commit fb94ec44cee704f79e6a1796a44b4d5c04f9699d Author: yaqian.zhang <598593...@qq.com> AuthorDate: Wed Dec 16 18:59:28 2020 +0800 KYLIN-4818 Check lookup table duplicate key when building job --- .../engine/spark/builder/CubeSnapshotBuilder.scala | 19 ++++++++++++++++++- .../kylin/engine/spark/job/ParentSourceChooser.scala | 1 + 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala index b2f0620..d1b62f0 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/builder/CubeSnapshotBuilder.scala @@ -31,10 +31,11 @@ import org.apache.kylin.common.util.HadoopUtil import org.apache.kylin.engine.spark.metadata.{SegmentInfo, TableDesc} import org.apache.spark.internal.Logging import org.apache.spark.sql.hive.utils.ResourceDetectUtils -import org.apache.spark.sql.{SparkSession} +import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.utils.ProxyThreadUtils import org.apache.kylin.engine.spark.utils.SparkDataSource._ import org.apache.kylin.engine.spark.utils.FileNames +import org.apache.spark.sql.functions.{count, countDistinct} import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} @@ -183,6 +184,22 @@ class CubeSnapshotBuilder extends Logging { } import org.apache.kylin.engine.spark.utils.SparkDataSource._ + def checkDupKey() = { + val joinDescs = seg.joindescs + joinDescs.foreach { + joinDesc => + val tableInfo = joinDesc.lookupTable + val lookupTableName = tableInfo.tableName + val df = ss.table(tableInfo) + val countColumn = df.count() + val lookupTablePKS = joinDesc.PKS.map(lookupTablePK => lookupTablePK.columnName) + val countDistinctColumn = df.agg(countDistinct(lookupTablePKS.head, lookupTablePKS.tail: _*)).collect().map(_.getLong(0)).head + if (countColumn != countDistinctColumn) { + throw new IllegalStateException(s"Failed to build lookup table ${lookupTableName} snapshot for Dup key found, key= ${lookupTablePKS}") + } + } + } + def buildSnapshotWithoutMd5(tableInfo: TableDesc, baseDir: String): (String, String) = { val sourceData = ss.table(tableInfo) val tablePath = FileNames.snapshotFile(tableInfo, seg.project) diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala index 7d719de..7697fd6 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/ParentSourceChooser.scala @@ -69,6 +69,7 @@ class ParentSourceChooser( // eg: resource detect // Move this to a more suitable place val builder = new CubeSnapshotBuilder(seg, ss) + builder.checkDupKey() seg = builder.buildSnapshot } flatTableSource = getFlatTable()