This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 2159699d8e54588d879e0ebdbb1f2ed11380bb23 Author: Yaguang Jia <jiayagu...@foxmail.com> AuthorDate: Thu Sep 21 16:51:14 2023 +0800 KYLIN-5816 Add zk lock for dict v3 --- .../engine/spark/builder/DFDictionaryBuilder.scala | 17 +----- .../kylin/engine/spark/builder/ZKHelper.scala | 42 +++++++++++++ .../spark/builder/v3dict/DictionaryBuilder.scala | 69 ++++++++++------------ 3 files changed, 75 insertions(+), 53 deletions(-) diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala index ae95344cd0..af8399ca3a 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DFDictionaryBuilder.scala @@ -48,28 +48,13 @@ class DFDictionaryBuilder( changeAQEConfig(true) } - private val YARN_CLUSTER: String = "cluster" private val AQE = "spark.sql.adaptive.enabled"; private val originalAQE = ss.conf.get(AQE) - private def tryZKJaasConfiguration(): Unit = { - val config = KylinConfig.getInstanceFromEnv - if (YARN_CLUSTER.equals(config.getDeployMode)) { - val kapConfig = KapConfig.wrap(config) - if (KapConfig.FI_PLATFORM.equals(kapConfig.getKerberosPlatform) || KapConfig.TDH_PLATFORM.equals(kapConfig.getKerberosPlatform)) { - val sparkConf = ss.sparkContext.getConf - val principal = sparkConf.get("spark.kerberos.principal") - val keytab = sparkConf.get("spark.kerberos.keytab") - logInfo(s"ZKJaasConfiguration principal: $principal, keyTab: $keytab") - javax.security.auth.login.Configuration.setConfiguration(new ZKJaasConfiguration(principal, keytab)) - } - } - } - @throws[IOException] private[builder] def safeBuild(ref: TblColRef): Unit = { val sourceColumn = ref.getIdentity - tryZKJaasConfiguration() + ZKHelper.tryZKJaasConfiguration(ss) val lock: Lock = KylinConfig.getInstanceFromEnv.getDistributedLockFactory .getLockForCurrentThread(getLockPath(sourceColumn)) lock.lock() diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/ZKHelper.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/ZKHelper.scala new file mode 100644 index 0000000000..1fd321f64e --- /dev/null +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/ZKHelper.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.builder + +import org.apache.kylin.common.{KapConfig, KylinConfig} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession + +object ZKHelper extends Logging{ + private val YARN_CLUSTER: String = "cluster" + + def tryZKJaasConfiguration(ss: SparkSession): Unit = { + val config = KylinConfig.getInstanceFromEnv + if (YARN_CLUSTER.equals(config.getDeployMode)) { + val kapConfig = KapConfig.wrap(config) + if (KapConfig.FI_PLATFORM.equals(kapConfig.getKerberosPlatform) || KapConfig.TDH_PLATFORM.equals(kapConfig.getKerberosPlatform)) { + val sparkConf = ss.sparkContext.getConf + val principal = sparkConf.get("spark.kerberos.principal") + val keytab = sparkConf.get("spark.kerberos.keytab") + logInfo(s"ZKJaasConfiguration principal: $principal, keyTab: $keytab") + javax.security.auth.login.Configuration.setConfiguration(new ZKJaasConfiguration(principal, keytab)) + } + } + } + +} diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala index 5fd9ae1673..9938b1e00c 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/v3dict/DictionaryBuilder.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path import org.apache.kylin.common.KylinConfig import org.apache.kylin.common.exception.KylinRuntimeException import org.apache.kylin.common.util.HadoopUtil +import org.apache.kylin.engine.spark.builder.ZKHelper import org.apache.kylin.engine.spark.builder.v3dict.DictBuildMode.{V2UPGRADE, V3APPEND, V3INIT, V3UPGRADE} import org.apache.kylin.engine.spark.job.NSparkCubingUtil import org.apache.kylin.metadata.model.TblColRef @@ -40,7 +41,7 @@ import util.retry.blocking.RetryStrategy.RetryStrategyProducer import util.retry.blocking.{Failure, Retry, RetryStrategy, Success} import java.nio.file.Paths -import java.util.concurrent.locks.ReentrantReadWriteLock +import java.util.concurrent.locks.Lock import scala.collection.mutable.ListBuffer import scala.concurrent.duration.DurationInt import scala.util.control.NonFatal @@ -49,7 +50,6 @@ object DictionaryBuilder extends Logging { implicit val retryStrategy: RetryStrategyProducer = RetryStrategy.randomBackOff(5.seconds, 15.seconds, maxAttempts = 20) - lazy val v3dictMergeLock = new ReentrantReadWriteLock(true) def buildGlobalDict( project: String, @@ -88,12 +88,11 @@ object DictionaryBuilder extends Logging { private def transformerDictPlan( spark: SparkSession, context: DictionaryContext, - plan: LogicalPlan): DictPlanVersion = { + plan: LogicalPlan): LogicalPlan = { val dictPath = getDictionaryPathAndCheck(context) - val dictVersion = DeltaLog.forTable(spark, dictPath).snapshot.version - val dictTableDF = spark.read.format("delta").option("versionAsOf", dictVersion).load(dictPath); - val maxOffset = dictTableDF.count() + val dictTable: DeltaTable = DeltaTable.forPath(dictPath) + val maxOffset = dictTable.toDF.count() logInfo(s"Dict $dictPath item count $maxOffset") plan match { @@ -102,19 +101,19 @@ object DictionaryBuilder extends Logging { val windowSpec = org.apache.spark.sql.expressions.Window.orderBy(col(column)) val joinCondition = createColumn( EqualTo(col(column).cast(StringType).expr, - getLogicalPlan(dictTableDF).output.head)) - val filterKey = getLogicalPlan(dictTableDF).output.head.name + getLogicalPlan(dictTable.toDF).output.head)) + val filterKey = getLogicalPlan(dictTable.toDF).output.head.name val antiJoinDF = getDataFrame(spark, windowChild) .filter(col(filterKey).isNotNull) - .join(dictTableDF, + .join(dictTable.toDF, joinCondition, "left_anti") .select(col(column).cast(StringType) as "dict_key", (row_number().over(windowSpec) + lit(maxOffset)).cast(LongType) as "dict_value") logInfo(s"Dict logical plan : ${antiJoinDF.queryExecution.logical.treeString}") - DictPlanVersion(getLogicalPlan(antiJoinDF), dictVersion) + getLogicalPlan(antiJoinDF) - case _ => DictPlanVersion(plan, dictVersion) + case _ => plan } } @@ -186,34 +185,22 @@ object DictionaryBuilder extends Logging { } private def mergeIncrementDict(spark: SparkSession, context: DictionaryContext, plan: LogicalPlan): Unit = { - val incDictVersion = transformerDictPlan(spark, context, plan) - val incrementDictDF = getDataFrame(spark, incDictVersion.incDictPlan) - val dictPath = getDictionaryPathAndCheck(context) - if (incrementDictDF.isEmpty) { - logInfo(s"Increment dict for global dict $dictPath is empty, no need to merge.") - return - } - tryMergeIncrementDict(spark, dictPath, incDictVersion.sourceDeltaVersion, incrementDictDF) - } - - private def tryMergeIncrementDict(spark: SparkSession, dictPath: String, dictVersion: Long, - incDictDF: Dataset[Row]): Unit = { - v3dictMergeLock.writeLock().lock() + ZKHelper.tryZKJaasConfiguration(spark) + val lock: Lock = KylinConfig.getInstanceFromEnv.getDistributedLockFactory + .getLockForCurrentThread(getDictionaryLockPath(context)) + lock.lock() try { + val dictPlan = transformerDictPlan(spark, context, plan) + val incrementDictDF = getDataFrame(spark, dictPlan) + val dictPath = getDictionaryPathAndCheck(context) logInfo(s"Increment build global dict $dictPath") - val curVersion = DeltaLog.forTable(spark, dictPath).snapshot.version - if (dictVersion != curVersion) { - logInfo(s"Cur v3dict version is $curVersion, incDict is based on version $curVersion, will be retry") - throw new KylinRuntimeException(s"Cur v3dict version is $curVersion, " + - s"incDict is based on version $curVersion, will be retry") - } else { - val dictTable = DeltaTable.forPath(dictPath) - dictTable.merge(incDictDF, "1 != 1") - .whenNotMatched().insertAll() - .execute() - } + val dictTable = DeltaTable.forPath(dictPath) + dictTable.alias("dict") + .merge(incrementDictDF.alias("incre_dict"), "1 != 1") + .whenNotMatched().insertAll() + .execute() } finally { - v3dictMergeLock.writeLock().unlock() + lock.unlock() } } @@ -353,11 +340,19 @@ object DictionaryBuilder extends Logging { v3ditPath } + def getDictionaryLockPath(context: DictionaryContext): String = { + val dictPath = Paths.get(context.project, + HadoopUtil.GLOBAL_DICT_V3_STORAGE_ROOT, + context.dbName, + context.tableName, + context.columnName) + dictPath.toString + } + def wrapCol(ref: TblColRef): String = { NSparkCubingUtil.convertFromDot(ref.getBackTickIdentity) } } -case class DictPlanVersion(incDictPlan: LogicalPlan, sourceDeltaVersion: Long) class DictionaryContext( val project: String,