This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 2159699d8e54588d879e0ebdbb1f2ed11380bb23
Author: Yaguang Jia <jiayagu...@foxmail.com>
AuthorDate: Thu Sep 21 16:51:14 2023 +0800

    KYLIN-5816 Add zk lock for dict v3
---
 .../engine/spark/builder/DFDictionaryBuilder.scala | 17 +-----
 .../kylin/engine/spark/builder/ZKHelper.scala      | 42 +++++++++++++
 .../spark/builder/v3dict/DictionaryBuilder.scala   | 69 ++++++++++------------
 3 files changed, 75 insertions(+), 53 deletions(-)

diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
index ae95344cd0..af8399ca3a 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala
@@ -48,28 +48,13 @@ class DFDictionaryBuilder(
     changeAQEConfig(true)
   }
 
-  private val YARN_CLUSTER: String = "cluster"
   private val AQE = "spark.sql.adaptive.enabled";
   private val originalAQE = ss.conf.get(AQE)
 
-  private def tryZKJaasConfiguration(): Unit = {
-    val config = KylinConfig.getInstanceFromEnv
-    if (YARN_CLUSTER.equals(config.getDeployMode)) {
-      val kapConfig = KapConfig.wrap(config)
-      if (KapConfig.FI_PLATFORM.equals(kapConfig.getKerberosPlatform) || 
KapConfig.TDH_PLATFORM.equals(kapConfig.getKerberosPlatform)) {
-        val sparkConf = ss.sparkContext.getConf
-        val principal = sparkConf.get("spark.kerberos.principal")
-        val keytab = sparkConf.get("spark.kerberos.keytab")
-        logInfo(s"ZKJaasConfiguration principal: $principal, keyTab: $keytab")
-        javax.security.auth.login.Configuration.setConfiguration(new 
ZKJaasConfiguration(principal, keytab))
-      }
-    }
-  }
-
   @throws[IOException]
   private[builder] def safeBuild(ref: TblColRef): Unit = {
     val sourceColumn = ref.getIdentity
-    tryZKJaasConfiguration()
+    ZKHelper.tryZKJaasConfiguration(ss)
     val lock: Lock = KylinConfig.getInstanceFromEnv.getDistributedLockFactory
       .getLockForCurrentThread(getLockPath(sourceColumn))
     lock.lock()
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/ZKHelper.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/ZKHelper.scala
new file mode 100644
index 0000000000..1fd321f64e
--- /dev/null
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/ZKHelper.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.builder
+
+import org.apache.kylin.common.{KapConfig, KylinConfig}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+object ZKHelper extends Logging{
+  private val YARN_CLUSTER: String = "cluster"
+
+  def tryZKJaasConfiguration(ss: SparkSession): Unit = {
+    val config = KylinConfig.getInstanceFromEnv
+    if (YARN_CLUSTER.equals(config.getDeployMode)) {
+      val kapConfig = KapConfig.wrap(config)
+      if (KapConfig.FI_PLATFORM.equals(kapConfig.getKerberosPlatform) || 
KapConfig.TDH_PLATFORM.equals(kapConfig.getKerberosPlatform)) {
+        val sparkConf = ss.sparkContext.getConf
+        val principal = sparkConf.get("spark.kerberos.principal")
+        val keytab = sparkConf.get("spark.kerberos.keytab")
+        logInfo(s"ZKJaasConfiguration principal: $principal, keyTab: $keytab")
+        javax.security.auth.login.Configuration.setConfiguration(new 
ZKJaasConfiguration(principal, keytab))
+      }
+    }
+  }
+
+}
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala
index 5fd9ae1673..9938b1e00c 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.common.exception.KylinRuntimeException
 import org.apache.kylin.common.util.HadoopUtil
+import org.apache.kylin.engine.spark.builder.ZKHelper
 import org.apache.kylin.engine.spark.builder.v3dict.DictBuildMode.{V2UPGRADE, 
V3APPEND, V3INIT, V3UPGRADE}
 import org.apache.kylin.engine.spark.job.NSparkCubingUtil
 import org.apache.kylin.metadata.model.TblColRef
@@ -40,7 +41,7 @@ import util.retry.blocking.RetryStrategy.RetryStrategyProducer
 import util.retry.blocking.{Failure, Retry, RetryStrategy, Success}
 
 import java.nio.file.Paths
-import java.util.concurrent.locks.ReentrantReadWriteLock
+import java.util.concurrent.locks.Lock
 import scala.collection.mutable.ListBuffer
 import scala.concurrent.duration.DurationInt
 import scala.util.control.NonFatal
@@ -49,7 +50,6 @@ object DictionaryBuilder extends Logging {
 
   implicit val retryStrategy: RetryStrategyProducer =
     RetryStrategy.randomBackOff(5.seconds, 15.seconds, maxAttempts = 20)
-  lazy val v3dictMergeLock = new ReentrantReadWriteLock(true)
 
   def buildGlobalDict(
                        project: String,
@@ -88,12 +88,11 @@ object DictionaryBuilder extends Logging {
   private def transformerDictPlan(
                                    spark: SparkSession,
                                    context: DictionaryContext,
-                                   plan: LogicalPlan): DictPlanVersion = {
+                                   plan: LogicalPlan): LogicalPlan = {
 
     val dictPath = getDictionaryPathAndCheck(context)
-    val dictVersion = DeltaLog.forTable(spark, dictPath).snapshot.version
-    val dictTableDF = spark.read.format("delta").option("versionAsOf", 
dictVersion).load(dictPath);
-    val maxOffset = dictTableDF.count()
+    val dictTable: DeltaTable = DeltaTable.forPath(dictPath)
+    val maxOffset = dictTable.toDF.count()
     logInfo(s"Dict $dictPath item count $maxOffset")
 
     plan match {
@@ -102,19 +101,19 @@ object DictionaryBuilder extends Logging {
         val windowSpec = 
org.apache.spark.sql.expressions.Window.orderBy(col(column))
         val joinCondition = createColumn(
           EqualTo(col(column).cast(StringType).expr,
-            getLogicalPlan(dictTableDF).output.head))
-        val filterKey = getLogicalPlan(dictTableDF).output.head.name
+            getLogicalPlan(dictTable.toDF).output.head))
+        val filterKey = getLogicalPlan(dictTable.toDF).output.head.name
         val antiJoinDF = getDataFrame(spark, windowChild)
           .filter(col(filterKey).isNotNull)
-          .join(dictTableDF,
+          .join(dictTable.toDF,
             joinCondition,
             "left_anti")
           .select(col(column).cast(StringType) as "dict_key",
             (row_number().over(windowSpec) + lit(maxOffset)).cast(LongType) as 
"dict_value")
         logInfo(s"Dict logical plan : 
${antiJoinDF.queryExecution.logical.treeString}")
-        DictPlanVersion(getLogicalPlan(antiJoinDF), dictVersion)
+        getLogicalPlan(antiJoinDF)
 
-      case _ => DictPlanVersion(plan, dictVersion)
+      case _ => plan
     }
   }
 
@@ -186,34 +185,22 @@ object DictionaryBuilder extends Logging {
   }
 
   private def mergeIncrementDict(spark: SparkSession, context: 
DictionaryContext, plan: LogicalPlan): Unit = {
-    val incDictVersion = transformerDictPlan(spark, context, plan)
-    val incrementDictDF = getDataFrame(spark, incDictVersion.incDictPlan)
-    val dictPath = getDictionaryPathAndCheck(context)
-    if (incrementDictDF.isEmpty) {
-      logInfo(s"Increment dict for global dict $dictPath is empty, no need to 
merge.")
-      return
-    }
-    tryMergeIncrementDict(spark, dictPath, incDictVersion.sourceDeltaVersion, 
incrementDictDF)
-  }
-
-  private def tryMergeIncrementDict(spark: SparkSession, dictPath: String, 
dictVersion: Long,
-                                    incDictDF: Dataset[Row]): Unit = {
-    v3dictMergeLock.writeLock().lock()
+    ZKHelper.tryZKJaasConfiguration(spark)
+    val lock: Lock = KylinConfig.getInstanceFromEnv.getDistributedLockFactory
+      .getLockForCurrentThread(getDictionaryLockPath(context))
+    lock.lock()
     try {
+      val dictPlan = transformerDictPlan(spark, context, plan)
+      val incrementDictDF = getDataFrame(spark, dictPlan)
+      val dictPath = getDictionaryPathAndCheck(context)
       logInfo(s"Increment build global dict $dictPath")
-      val curVersion = DeltaLog.forTable(spark, dictPath).snapshot.version
-      if (dictVersion != curVersion) {
-        logInfo(s"Cur v3dict version is $curVersion, incDict is based on 
version $curVersion, will be retry")
-        throw new KylinRuntimeException(s"Cur v3dict version is $curVersion, " 
+
-          s"incDict is based on version $curVersion, will be retry")
-      } else {
-        val dictTable = DeltaTable.forPath(dictPath)
-        dictTable.merge(incDictDF, "1 != 1")
-          .whenNotMatched().insertAll()
-          .execute()
-      }
+      val dictTable = DeltaTable.forPath(dictPath)
+      dictTable.alias("dict")
+        .merge(incrementDictDF.alias("incre_dict"), "1 != 1")
+        .whenNotMatched().insertAll()
+        .execute()
     } finally {
-      v3dictMergeLock.writeLock().unlock()
+      lock.unlock()
     }
   }
 
@@ -353,11 +340,19 @@ object DictionaryBuilder extends Logging {
     v3ditPath
   }
 
+  def getDictionaryLockPath(context: DictionaryContext): String = {
+    val dictPath = Paths.get(context.project,
+      HadoopUtil.GLOBAL_DICT_V3_STORAGE_ROOT,
+      context.dbName,
+      context.tableName,
+      context.columnName)
+    dictPath.toString
+  }
+
   def wrapCol(ref: TblColRef): String = {
     NSparkCubingUtil.convertFromDot(ref.getBackTickIdentity)
   }
 }
-case class DictPlanVersion(incDictPlan: LogicalPlan, sourceDeltaVersion: Long)
 
 class DictionaryContext(
                          val project: String,

Reply via email to