This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 417f47fa7a7712f93fa7a75ebe62ec867c91d324 Author: huangsheng <huangshen...@163.com> AuthorDate: Fri Nov 18 11:30:00 2022 +0800 KYLIN-5409 avoid permission denied job retry --- .../engine/spark/application/SparkApplication.java | 14 +- .../engine/spark/builder/SnapshotBuilder.scala | 110 ++++---- .../org/apache/spark/application/JobWorker.scala | 8 +- .../apache/spark/application/TestJobMonitor.scala | 2 +- .../apache/spark/application/TestJobWorker.scala | 286 ++++++++++++++++++++- 5 files changed, 372 insertions(+), 48 deletions(-) diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java index 9cc144902d..42117ac398 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.AccessControlException; import org.apache.kylin.cluster.IClusterManager; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.exception.KylinException; @@ -322,11 +323,22 @@ public abstract class SparkApplication implements Application { executeFinish(); } } - protected void handleException(Exception e) throws Exception { + if (e instanceof AccessControlException) { + interceptAccessControlException(e); + } + if (e instanceof RuntimeException && e.getCause() instanceof AccessControlException) { + interceptAccessControlException(e.getCause()); + } throw e; } + // Permission exception will not be retried. Simply let the job fail. + protected void interceptAccessControlException(Throwable e) throws NoRetryException{ + logger.error("Permission denied.", e); + throw new NoRetryException("Permission denied."); + } + private SparkSession createSpark(SparkConf sparkConf) { SparkSession.Builder sessionBuilder = SparkSession.builder() .withExtensions(new AbstractFunction1<SparkSessionExtensions, BoxedUnit>() { diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala index b077112498..777c16d464 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala @@ -21,6 +21,7 @@ package org.apache.kylin.engine.spark.builder import com.google.common.collect.Maps import org.apache.commons.codec.digest.DigestUtils import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} +import org.apache.hadoop.security.AccessControlException import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig import org.apache.kylin.common.persistence.transaction.UnitOfWork import org.apache.kylin.common.util.HadoopUtil @@ -31,6 +32,7 @@ import org.apache.kylin.engine.spark.utils.{FileNames, LogUtils} import org.apache.kylin.metadata.model.{NDataModel, NTableMetadataManager, TableDesc, TableExtDesc} import org.apache.kylin.metadata.project.NProjectManager import org.apache.kylin.source.SourceFactory +import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.sql.hive.utils.ResourceDetectUtils import org.apache.spark.sql._ @@ -203,6 +205,69 @@ class SnapshotBuilder(var jobId: String) extends Logging with Serializable { totalRows } + def executeParallelBuildSnapshot(ss: SparkSession, toBuildTableDesc: Set[TableDesc], baseDir: String, + snapSizeMap: ConcurrentMap[String, Result], fs: FileSystem, snapshotParallelBuildTimeoutSeconds: Int): Unit = { + + val kylinConf = KylinConfig.getInstanceFromEnv + val project = toBuildTableDesc.iterator.next.getProject + val stepCheckpoint = getStepCheckpoint(kylinConf.getJobTmpDir(project), fs) + + val service = Executors.newCachedThreadPool() + implicit val executorContext = ExecutionContext.fromExecutorService(service) + val futures = toBuildTableDesc.map(tableDesc => + Future { + var config: SetAndUnsetThreadLocalConfig = null + try { + if (stepCheckpoint.exists(_.canSkip(tableDesc))) { + logInfo(s"Skip snapshot ${tableDesc.getIdentity}") + } else { + config = KylinConfig.setAndUnsetThreadLocalConfig(kylinConf) + buildSingleSnapshotWithoutMd5(ss, tableDesc, baseDir, snapSizeMap) + // do step checkpoint + stepCheckpoint.map(_.checkpoint(tableDesc)) + } + } catch { + case exception: Exception => + logError(s"Error for build snapshot table with $tableDesc", exception) + throw exception + } finally { + if (config != null) { + config.close() + } + } + } + ) + try { + val eventualTuples = Future.sequence(futures.toList) + // only throw the first exception + ProxyThreadUtils.awaitResult(eventualTuples, snapshotParallelBuildTimeoutSeconds seconds) + } catch { + case e: SparkException => + ProxyThreadUtils.shutdown(service) + e.getCause match { + case pd: AccessControlException => + logError(s"Error for await snapshot table result due to AccessControlException", pd) + throw pd + case _ => throw e + } + case e: Exception => + ProxyThreadUtils.shutdown(service) + throw e + } + } + + def executeSerialBuildSnapshot(ss: SparkSession, toBuildTableDesc: Set[TableDesc], baseDir: String, + snapSizeMap: ConcurrentMap[String, Result], fs: FileSystem, stepCheckpoint: Option[StepCheckpointSnapshot]): Unit = { + toBuildTableDesc.foreach(tableDesc => { + if (stepCheckpoint.exists(_.canSkip(tableDesc))) { + logInfo(s"Skip snapshot ${tableDesc.getIdentity}") + } else { + buildSingleSnapshot(ss, tableDesc, baseDir, fs, snapSizeMap) + // do step checkpoint + stepCheckpoint.map(_.checkpoint(tableDesc)) + } + }) + } // scalastyle:off def executeBuildSnapshot(ss: SparkSession, toBuildTableDesc: Set[TableDesc], baseDir: String, isParallelBuild: Boolean, snapshotParallelBuildTimeoutSeconds: Int): util.Map[String, Result] = { @@ -213,50 +278,9 @@ class SnapshotBuilder(var jobId: String) extends Logging with Serializable { val stepCheckpoint = getStepCheckpoint(kylinConf.getJobTmpDir(project), fs) if (isParallelBuild) { - val service = Executors.newCachedThreadPool() - implicit val executorContext = ExecutionContext.fromExecutorService(service) - val futures = toBuildTableDesc.map(tableDesc => - Future { - var config: SetAndUnsetThreadLocalConfig = null - try { - if (stepCheckpoint.exists(_.canSkip(tableDesc))) { - logInfo(s"Skip snapshot ${tableDesc.getIdentity}") - } else { - config = KylinConfig.setAndUnsetThreadLocalConfig(kylinConf) - buildSingleSnapshotWithoutMd5(ss, tableDesc, baseDir, snapSizeMap) - // do step checkpoint - stepCheckpoint.map(_.checkpoint(tableDesc)) - } - } catch { - case exception: Exception => - logError(s"Error for build snapshot table with $tableDesc", exception) - throw exception - } finally { - if (config != null) { - config.close() - } - } - } - ) - try { - val eventualTuples = Future.sequence(futures.toList) - // only throw the first exception - ProxyThreadUtils.awaitResult(eventualTuples, snapshotParallelBuildTimeoutSeconds seconds) - } catch { - case e: Exception => - ProxyThreadUtils.shutdown(service) - throw e - } + executeParallelBuildSnapshot(ss, toBuildTableDesc, baseDir, snapSizeMap, fs, snapshotParallelBuildTimeoutSeconds) } else { - toBuildTableDesc.foreach(tableDesc => { - if (stepCheckpoint.exists(_.canSkip(tableDesc))) { - logInfo(s"Skip snapshot ${tableDesc.getIdentity}") - } else { - buildSingleSnapshot(ss, tableDesc, baseDir, fs, snapSizeMap) - // do step checkpoint - stepCheckpoint.map(_.checkpoint(tableDesc)) - } - }) + executeSerialBuildSnapshot(ss, toBuildTableDesc, baseDir, snapSizeMap, fs, stepCheckpoint) } snapSizeMap } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala index 069ac171cc..77b8834f02 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala @@ -18,6 +18,7 @@ package org.apache.spark.application + import java.util.concurrent.Executors import org.apache.kylin.engine.spark.application.SparkApplication @@ -48,7 +49,6 @@ class JobWorker(application: SparkApplication, args: Array[String], eventLoop: K execute() } - private def execute(): Unit = { pool.execute(new Runnable { override def run(): Unit = { @@ -56,6 +56,12 @@ class JobWorker(application: SparkApplication, args: Array[String], eventLoop: K application.execute(args) eventLoop.post(JobSucceeded()) } catch { + // Compatible with runtime exceptions thrown by the SparkApplication.execute(args: Array[String]) + case runtimeException: RuntimeException => + runtimeException.getCause match { + case noRetryException: NoRetryException => eventLoop.post(UnknownThrowable(noRetryException)) + case throwable: Throwable => eventLoop.post(ResourceLack(throwable)) + } case exception: NoRetryException => eventLoop.post(UnknownThrowable(exception)) case throwable: Throwable => eventLoop.post(ResourceLack(throwable)) } diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala index b9a1ff87c2..1864d8749c 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala @@ -22,6 +22,7 @@ import java.util import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicBoolean import com.amazonaws.services.s3.model.AmazonS3Exception +import org.apache.hadoop.security.AccessControlException import org.apache.kylin.cluster.{AvailableResource, IClusterManager, ResourceInfo} import org.apache.kylin.common.KylinConfig import org.apache.kylin.engine.spark.job.KylinBuildEnv @@ -307,7 +308,6 @@ class TestJobMonitor extends SparderBaseFunSuite with BeforeAndAfterEach { } } - test("post JobFailed event when receive class not found event") { withEventLoop { eventLoop => Mockito.when(config.getSparkEngineMaxRetryTime).thenReturn(1) diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala index 73035a24c3..c1cfa95387 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala @@ -18,9 +18,10 @@ package org.apache.spark.application -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicBoolean +import org.apache.hadoop.security.AccessControlException +import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.concurrent.atomic.AtomicBoolean import org.apache.kylin.engine.spark.application.SparkApplication import org.apache.kylin.engine.spark.scheduler._ import org.apache.spark.scheduler.KylinJobEventLoop @@ -53,6 +54,54 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter { eventLoop.stop() } + test("post ResourceLack event when job failed for non-AccessControlException") { + val eventLoop = new KylinJobEventLoop + eventLoop.start() + val worker = new JobWorker(new ResourceLackJobWithNonAccessControlException(), Array.empty, eventLoop) + val latch = new CountDownLatch(2) + val receiveResourceLack = new AtomicBoolean(false) + val listener = new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + if (event.isInstanceOf[ResourceLack]) { + receiveResourceLack.getAndSet(true) + } + latch.countDown() + } + } + eventLoop.registerListener(listener) + eventLoop.post(RunJob()) + // receive RunJob and ResourceLack + latch.await(30, TimeUnit.SECONDS) + assert(receiveResourceLack.get()) + eventLoop.unregisterListener(listener) + worker.stop() + eventLoop.stop() + } + + test("post ResourceLack event when job failed with runtime exception for lack of resource") { + val eventLoop = new KylinJobEventLoop + eventLoop.start() + val worker = new JobWorker(new ResourceLackJobWithRuntimeException(), Array.empty, eventLoop) + val latch = new CountDownLatch(2) + val receiveResourceLack = new AtomicBoolean(false) + val listener = new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + if (event.isInstanceOf[ResourceLack]) { + receiveResourceLack.getAndSet(true) + } + latch.countDown() + } + } + eventLoop.registerListener(listener) + eventLoop.post(RunJob()) + // receive RunJob and ResourceLack + latch.await(30, TimeUnit.SECONDS) + assert(receiveResourceLack.get()) + eventLoop.unregisterListener(listener) + worker.stop() + eventLoop.stop() + } + test("post JobSucceeded event when job succeeded") { val eventLoop = new KylinJobEventLoop eventLoop.start() @@ -100,8 +149,153 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter { worker.stop() eventLoop.stop() } + + test("post Permission denied event when PermissionDenied occurred with handle Exception function") { + val eventLoop = new KylinJobEventLoop + eventLoop.start() + val worker = new JobWorker(new InterceptPermissionDeniedJob(), Array.empty, eventLoop) + val latch = new CountDownLatch(2) + val receivePermissionDenied = new AtomicBoolean(false) + val listener = new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + if (event.isInstanceOf[UnknownThrowable]) { + receivePermissionDenied.getAndSet(true) + } + latch.countDown() + } + } + eventLoop.registerListener(listener) + eventLoop.post(RunJob()) + // receive RunJob and PermissionDenied + latch.await(30, TimeUnit.SECONDS) + assert(receivePermissionDenied.get()) + eventLoop.unregisterListener(listener) + worker.stop() + eventLoop.stop() + } + + test("post Permission denied event when PermissionDenied occurred with interceptAccessControlException function") { + val eventLoop = new KylinJobEventLoop + eventLoop.start() + val worker = new JobWorker(new HandlePermissionDeniedJobWithAccessControlException(), Array.empty, eventLoop) + val latch = new CountDownLatch(2) + val receivePermissionDenied = new AtomicBoolean(false) + val listener = new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + if (event.isInstanceOf[UnknownThrowable]) { + receivePermissionDenied.getAndSet(true) + } + latch.countDown() + } + } + eventLoop.registerListener(listener) + eventLoop.post(RunJob()) + // receive RunJob and PermissionDenied + latch.await(30, TimeUnit.SECONDS) + assert(receivePermissionDenied.get()) + eventLoop.unregisterListener(listener) + worker.stop() + eventLoop.stop() + } + + test("post Permission denied event when PermissionDenied occurred with RuntimeException wraped") { + val eventLoop = new KylinJobEventLoop + eventLoop.start() + val worker = new JobWorker(new HandlePermissionDeniedJobWithRuntimeExceptionWraped(), Array.empty, eventLoop) + val latch = new CountDownLatch(2) + val receivePermissionDenied = new AtomicBoolean(false) + val listener = new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + if (event.isInstanceOf[UnknownThrowable]) { + receivePermissionDenied.getAndSet(true) + } + latch.countDown() + } + } + eventLoop.registerListener(listener) + eventLoop.post(RunJob()) + // receive RunJob and PermissionDenied + latch.await(30, TimeUnit.SECONDS) + assert(receivePermissionDenied.get()) + eventLoop.unregisterListener(listener) + worker.stop() + eventLoop.stop() + } + + test("post ResourceLack event when job failed for lack of resource with RuntimeException wraped") { + val eventLoop = new KylinJobEventLoop + eventLoop.start() + val worker = new JobWorker(new HandleResourceLackJobWithRuntimeExceptionWraped(), Array.empty, eventLoop) + val latch = new CountDownLatch(2) + val receivePermissionDenied = new AtomicBoolean(false) + val listener = new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + if (event.isInstanceOf[ResourceLack]) { + receivePermissionDenied.getAndSet(true) + } + latch.countDown() + } + } + eventLoop.registerListener(listener) + eventLoop.post(RunJob()) + // receive RunJob and PermissionDenied + latch.await(30, TimeUnit.SECONDS) + assert(receivePermissionDenied.get()) + eventLoop.unregisterListener(listener) + worker.stop() + eventLoop.stop() + } + + test("post Permission denied event when RuntimeException occurred") { + val eventLoop = new KylinJobEventLoop + eventLoop.start() + val worker = new JobWorker(new PermissionDeniedJobWithRuntimeExceptionWarped(), Array.empty, eventLoop) + val latch = new CountDownLatch(2) + val receivePermissionDenied = new AtomicBoolean(false) + val listener = new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + if (event.isInstanceOf[UnknownThrowable]) { + receivePermissionDenied.getAndSet(true) + } + latch.countDown() + } + } + eventLoop.registerListener(listener) + eventLoop.post(RunJob()) + // receive RunJob and PermissionDenied + latch.await(30, TimeUnit.SECONDS) + assert(receivePermissionDenied.get()) + eventLoop.unregisterListener(listener) + worker.stop() + eventLoop.stop() + } + test("post Permission denied event when AccessControlException occurred") { + val eventLoop = new KylinJobEventLoop + eventLoop.start() + val worker = new JobWorker(new PermissionDeniedJobWithNoRetryException(), Array.empty, eventLoop) + val latch = new CountDownLatch(2) + val receivePermissionDenied = new AtomicBoolean(false) + val listener = new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + if (event.isInstanceOf[UnknownThrowable]) { + receivePermissionDenied.getAndSet(true) + } + latch.countDown() + } + } + eventLoop.registerListener(listener) + eventLoop.post(RunJob()) + // receive RunJob and PermissionDenied + latch.await(30, TimeUnit.SECONDS) + assert(receivePermissionDenied.get()) + eventLoop.unregisterListener(listener) + worker.stop() + eventLoop.stop() + } } + + class UnknownThrowableJob extends SparkApplication { override def execute(args: Array[String]): Unit = { throw new NoRetryException() @@ -110,6 +304,81 @@ class UnknownThrowableJob extends SparkApplication { override protected def doExecute(): Unit = {} } +class InterceptPermissionDeniedJob extends SparkApplication { + override def execute(args: Array[String]): Unit = { + try { + throw new AccessControlException() + } catch { + case e : AccessControlException => + interceptAccessControlException(e) + } + } + override protected def doExecute(): Unit = {} +} + +class HandlePermissionDeniedJobWithAccessControlException extends SparkApplication { + override def execute(args: Array[String]): Unit = { + try { + throw new AccessControlException() + } catch { + case e: Exception => handleException(e) + } + } + override protected def doExecute(): Unit = {} +} + +class ResourceLackJobWithNonAccessControlException extends SparkApplication { + override def execute(args: Array[String]): Unit = { + try { + throw new Exception + } catch { + case e: Exception => handleException(e) + } + } + override protected def doExecute(): Unit = {} +} + +class HandlePermissionDeniedJobWithRuntimeExceptionWraped extends SparkApplication { + override def execute(args: Array[String]): Unit = { + try { + throw new AccessControlException() + } catch { + case e: Exception => handleException(new RuntimeException("PermissionDenied", e)) + } + } + override protected def doExecute(): Unit = {} +} + +class HandleResourceLackJobWithRuntimeExceptionWraped extends SparkApplication { + override def execute(args: Array[String]): Unit = { + try { + throw new Exception() + } catch { + case e: Exception => handleException(new RuntimeException("Resource Lack", e)) + } + } + override protected def doExecute(): Unit = {} +} + + +class PermissionDeniedJobWithRuntimeExceptionWarped extends SparkApplication { + override def execute(args: Array[String]): Unit = { + try { + throw new AccessControlException() + } catch { + case e : Exception => throw new RuntimeException("Error execute " + this.getClass.getName, new NoRetryException("Permission denied.")) + } + } + override protected def doExecute(): Unit = {} +} + +class PermissionDeniedJobWithNoRetryException extends SparkApplication { + override def execute(args: Array[String]): Unit = { + throw new NoRetryException("Permission Denied") + } + override protected def doExecute(): Unit = {} +} + class ResourceLackJob extends SparkApplication { override def execute(args: Array[String]): Unit = { @@ -119,6 +388,19 @@ class ResourceLackJob extends SparkApplication { override protected def doExecute(): Unit = {} } +class ResourceLackJobWithRuntimeException extends SparkApplication { + + override def execute(args: Array[String]): Unit = { + try { + throw new Exception() + } catch { + case e: Exception => throw new RuntimeException("Error execute " + this.getClass.getName, e) + } + } + + override protected def doExecute(): Unit = {} +} + class MockSucceedJob extends SparkApplication { override def execute(args: Array[String]): Unit = {}