This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit b34babb851439bca4a068aed21d2ba5d5f40914f Author: huangsheng <huangshen...@163.com> AuthorDate: Thu Nov 24 15:54:40 2022 +0800 KYLIN-5409 avoid permission denied job retry --- .../engine/spark/application/SparkApplication.java | 17 ++++ .../apache/spark/application/TestJobWorker.scala | 110 +++++++++++++++++++++ 2 files changed, 127 insertions(+) diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java index 42117ac398..5bc553d353 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java @@ -60,6 +60,7 @@ import org.apache.kylin.metadata.model.PartitionDesc; import org.apache.kylin.query.pushdown.SparkSubmitter; import org.apache.kylin.query.util.PushDownUtil; import org.apache.spark.SparkConf; +import org.apache.spark.SparkException; import org.apache.spark.application.NoRetryException; import org.apache.spark.launcher.SparkLauncher; import org.apache.spark.sql.KylinSession; @@ -323,16 +324,32 @@ public abstract class SparkApplication implements Application { executeFinish(); } } + protected void handleException(Exception e) throws Exception { if (e instanceof AccessControlException) { interceptAccessControlException(e); } if (e instanceof RuntimeException && e.getCause() instanceof AccessControlException) { interceptAccessControlException(e.getCause()); + } else if (e instanceof RuntimeException && e.getCause() instanceof SparkException) { + Throwable rootCause = extractRealRootCauseFromSparkException(e); + if (rootCause instanceof AccessControlException) { + interceptAccessControlException(e); + } } throw e; } + // Extract the real root exception that caused the spark job to fail. + // For example. Intercepts Spark Job that fail due to permissions exception to prevent unnecessary retry from wasting resources + protected Throwable extractRealRootCauseFromSparkException(Exception e) { + Throwable rootCause = e.getCause(); + while (rootCause instanceof SparkException) { + rootCause = rootCause.getCause(); + } + return rootCause; + } + // Permission exception will not be retried. Simply let the job fail. protected void interceptAccessControlException(Throwable e) throws NoRetryException{ logger.error("Permission denied.", e); diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala index c1cfa95387..06284a83e3 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala @@ -24,6 +24,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import org.apache.kylin.engine.spark.application.SparkApplication import org.apache.kylin.engine.spark.scheduler._ +import org.apache.spark.SparkException import org.apache.spark.scheduler.KylinJobEventLoop import org.apache.spark.sql.common.SparderBaseFunSuite import org.scalatest.BeforeAndAfter @@ -222,6 +223,54 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter { eventLoop.stop() } + test("post Permission denied event when PermissionDenied occurred with Spark Exception wraped") { + val eventLoop = new KylinJobEventLoop + eventLoop.start() + val worker = new JobWorker(new HandlePermissionDeniedJobWithSparkExceptionWraped(), Array.empty, eventLoop) + val latch = new CountDownLatch(2) + val receivePermissionDenied = new AtomicBoolean(false) + val listener = new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + if (event.isInstanceOf[UnknownThrowable]) { + receivePermissionDenied.getAndSet(true) + } + latch.countDown() + } + } + eventLoop.registerListener(listener) + eventLoop.post(RunJob()) + // receive RunJob and PermissionDenied + latch.await(30, TimeUnit.SECONDS) + assert(receivePermissionDenied.get()) + eventLoop.unregisterListener(listener) + worker.stop() + eventLoop.stop() + } + + test("post Permission denied event when PermissionDenied occurred with multiple Spark Exception wraped") { + val eventLoop = new KylinJobEventLoop + eventLoop.start() + val worker = new JobWorker(new HandlePermissionDeniedJobWithMultipleSparkExceptionWraped(), Array.empty, eventLoop) + val latch = new CountDownLatch(2) + val receivePermissionDenied = new AtomicBoolean(false) + val listener = new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + if (event.isInstanceOf[UnknownThrowable]) { + receivePermissionDenied.getAndSet(true) + } + latch.countDown() + } + } + eventLoop.registerListener(listener) + eventLoop.post(RunJob()) + // receive RunJob and PermissionDenied + latch.await(30, TimeUnit.SECONDS) + assert(receivePermissionDenied.get()) + eventLoop.unregisterListener(listener) + worker.stop() + eventLoop.stop() + } + test("post ResourceLack event when job failed for lack of resource with RuntimeException wraped") { val eventLoop = new KylinJobEventLoop eventLoop.start() @@ -246,6 +295,30 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter { eventLoop.stop() } + test("post ResourceLack event when job failed for lack of resource with Spark Exception wraped") { + val eventLoop = new KylinJobEventLoop + eventLoop.start() + val worker = new JobWorker(new HandleResourceLackJobWithSparkExceptionWraped(), Array.empty, eventLoop) + val latch = new CountDownLatch(2) + val receivePermissionDenied = new AtomicBoolean(false) + val listener = new KylinJobListener { + override def onReceive(event: KylinJobEvent): Unit = { + if (event.isInstanceOf[ResourceLack]) { + receivePermissionDenied.getAndSet(true) + } + latch.countDown() + } + } + eventLoop.registerListener(listener) + eventLoop.post(RunJob()) + // receive RunJob and PermissionDenied + latch.await(30, TimeUnit.SECONDS) + assert(receivePermissionDenied.get()) + eventLoop.unregisterListener(listener) + worker.stop() + eventLoop.stop() + } + test("post Permission denied event when RuntimeException occurred") { val eventLoop = new KylinJobEventLoop eventLoop.start() @@ -338,6 +411,7 @@ class ResourceLackJobWithNonAccessControlException extends SparkApplication { override protected def doExecute(): Unit = {} } + class HandlePermissionDeniedJobWithRuntimeExceptionWraped extends SparkApplication { override def execute(args: Array[String]): Unit = { try { @@ -349,6 +423,42 @@ class HandlePermissionDeniedJobWithRuntimeExceptionWraped extends SparkApplicati override protected def doExecute(): Unit = {} } +class HandlePermissionDeniedJobWithSparkExceptionWraped extends SparkApplication { + override def execute(args: Array[String]): Unit = { + try { + throw new AccessControlException() + } catch { + case e: Exception => handleException(new RuntimeException(new SparkException("PermissionDenied", e))) + } + } + override protected def doExecute(): Unit = {} +} + +class HandlePermissionDeniedJobWithMultipleSparkExceptionWraped extends SparkApplication { + override def execute(args: Array[String]): Unit = { + try { + throw new AccessControlException() + } catch { + case e: Exception => handleException(new RuntimeException( + new SparkException("Exception thrown in awaitResult", new SparkException("Job aborted", e)) + )) + } + } + + override protected def doExecute(): Unit = {} +} + +class HandleResourceLackJobWithSparkExceptionWraped extends SparkApplication { + override def execute(args: Array[String]): Unit = { + try { + throw new Exception() + } catch { + case e: Exception => handleException(new RuntimeException(new SparkException("Exception", e))) + } + } + override protected def doExecute(): Unit = {} +} + class HandleResourceLackJobWithRuntimeExceptionWraped extends SparkApplication { override def execute(args: Array[String]): Unit = { try {