This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 417f47fa7a7712f93fa7a75ebe62ec867c91d324
Author: huangsheng <huangshen...@163.com>
AuthorDate: Fri Nov 18 11:30:00 2022 +0800

    KYLIN-5409 avoid permission denied job retry
---
 .../engine/spark/application/SparkApplication.java |  14 +-
 .../engine/spark/builder/SnapshotBuilder.scala     | 110 ++++----
 .../org/apache/spark/application/JobWorker.scala   |   8 +-
 .../apache/spark/application/TestJobMonitor.scala  |   2 +-
 .../apache/spark/application/TestJobWorker.scala   | 286 ++++++++++++++++++++-
 5 files changed, 372 insertions(+), 48 deletions(-)

diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 9cc144902d..42117ac398 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.kylin.cluster.IClusterManager;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
@@ -322,11 +323,22 @@ public abstract class SparkApplication implements 
Application {
             executeFinish();
         }
     }
-
     protected void handleException(Exception e) throws Exception {
+        if (e instanceof AccessControlException) {
+            interceptAccessControlException(e);
+        }
+        if (e instanceof RuntimeException && e.getCause() instanceof 
AccessControlException) {
+            interceptAccessControlException(e.getCause());
+        }
         throw e;
     }
 
+    // Permission exception will not be retried. Simply let the job fail.
+    protected void interceptAccessControlException(Throwable e) throws 
NoRetryException{
+        logger.error("Permission denied.", e);
+        throw new NoRetryException("Permission denied.");
+    }
+
     private SparkSession createSpark(SparkConf sparkConf) {
         SparkSession.Builder sessionBuilder = SparkSession.builder()
                 .withExtensions(new AbstractFunction1<SparkSessionExtensions, 
BoxedUnit>() {
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
index b077112498..777c16d464 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
@@ -21,6 +21,7 @@ package org.apache.kylin.engine.spark.builder
 import com.google.common.collect.Maps
 import org.apache.commons.codec.digest.DigestUtils
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.hadoop.security.AccessControlException
 import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig
 import org.apache.kylin.common.persistence.transaction.UnitOfWork
 import org.apache.kylin.common.util.HadoopUtil
@@ -31,6 +32,7 @@ import org.apache.kylin.engine.spark.utils.{FileNames, 
LogUtils}
 import org.apache.kylin.metadata.model.{NDataModel, NTableMetadataManager, 
TableDesc, TableExtDesc}
 import org.apache.kylin.metadata.project.NProjectManager
 import org.apache.kylin.source.SourceFactory
+import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils
 import org.apache.spark.sql._
@@ -203,6 +205,69 @@ class SnapshotBuilder(var jobId: String) extends Logging 
with Serializable {
     totalRows
   }
 
+  def executeParallelBuildSnapshot(ss: SparkSession, toBuildTableDesc: 
Set[TableDesc], baseDir: String,
+                           snapSizeMap: ConcurrentMap[String, Result], fs: 
FileSystem, snapshotParallelBuildTimeoutSeconds: Int): Unit = {
+
+    val kylinConf = KylinConfig.getInstanceFromEnv
+    val project = toBuildTableDesc.iterator.next.getProject
+    val stepCheckpoint = getStepCheckpoint(kylinConf.getJobTmpDir(project), fs)
+
+    val service = Executors.newCachedThreadPool()
+    implicit val executorContext = 
ExecutionContext.fromExecutorService(service)
+    val futures = toBuildTableDesc.map(tableDesc =>
+      Future {
+        var config: SetAndUnsetThreadLocalConfig = null
+        try {
+          if (stepCheckpoint.exists(_.canSkip(tableDesc))) {
+            logInfo(s"Skip snapshot ${tableDesc.getIdentity}")
+          } else {
+            config = KylinConfig.setAndUnsetThreadLocalConfig(kylinConf)
+            buildSingleSnapshotWithoutMd5(ss, tableDesc, baseDir, snapSizeMap)
+            // do step checkpoint
+            stepCheckpoint.map(_.checkpoint(tableDesc))
+          }
+        } catch {
+          case exception: Exception =>
+            logError(s"Error for build snapshot table with $tableDesc", 
exception)
+            throw exception
+        } finally {
+          if (config != null) {
+            config.close()
+          }
+        }
+      }
+    )
+    try {
+      val eventualTuples = Future.sequence(futures.toList)
+      // only throw the first exception
+      ProxyThreadUtils.awaitResult(eventualTuples, 
snapshotParallelBuildTimeoutSeconds seconds)
+    } catch {
+      case e: SparkException =>
+        ProxyThreadUtils.shutdown(service)
+        e.getCause match {
+          case pd: AccessControlException =>
+            logError(s"Error for await snapshot table result due to 
AccessControlException", pd)
+            throw pd
+          case _ => throw e
+        }
+      case e: Exception =>
+        ProxyThreadUtils.shutdown(service)
+        throw e
+    }
+  }
+
+  def executeSerialBuildSnapshot(ss: SparkSession, toBuildTableDesc: 
Set[TableDesc], baseDir: String,
+                                 snapSizeMap: ConcurrentMap[String, Result], 
fs: FileSystem, stepCheckpoint: Option[StepCheckpointSnapshot]): Unit = {
+    toBuildTableDesc.foreach(tableDesc => {
+      if (stepCheckpoint.exists(_.canSkip(tableDesc))) {
+        logInfo(s"Skip snapshot ${tableDesc.getIdentity}")
+      } else {
+        buildSingleSnapshot(ss, tableDesc, baseDir, fs, snapSizeMap)
+        // do step checkpoint
+        stepCheckpoint.map(_.checkpoint(tableDesc))
+      }
+    })
+  }
   // scalastyle:off
   def executeBuildSnapshot(ss: SparkSession, toBuildTableDesc: Set[TableDesc], 
baseDir: String,
                            isParallelBuild: Boolean, 
snapshotParallelBuildTimeoutSeconds: Int): util.Map[String, Result] = {
@@ -213,50 +278,9 @@ class SnapshotBuilder(var jobId: String) extends Logging 
with Serializable {
     val stepCheckpoint = getStepCheckpoint(kylinConf.getJobTmpDir(project), fs)
 
     if (isParallelBuild) {
-      val service = Executors.newCachedThreadPool()
-      implicit val executorContext = 
ExecutionContext.fromExecutorService(service)
-      val futures = toBuildTableDesc.map(tableDesc =>
-        Future {
-          var config: SetAndUnsetThreadLocalConfig = null
-          try {
-            if (stepCheckpoint.exists(_.canSkip(tableDesc))) {
-              logInfo(s"Skip snapshot ${tableDesc.getIdentity}")
-            } else {
-              config = KylinConfig.setAndUnsetThreadLocalConfig(kylinConf)
-              buildSingleSnapshotWithoutMd5(ss, tableDesc, baseDir, 
snapSizeMap)
-              // do step checkpoint
-              stepCheckpoint.map(_.checkpoint(tableDesc))
-            }
-          } catch {
-            case exception: Exception =>
-              logError(s"Error for build snapshot table with $tableDesc", 
exception)
-              throw exception
-          } finally {
-            if (config != null) {
-              config.close()
-            }
-          }
-        }
-      )
-      try {
-        val eventualTuples = Future.sequence(futures.toList)
-        // only throw the first exception
-        ProxyThreadUtils.awaitResult(eventualTuples, 
snapshotParallelBuildTimeoutSeconds seconds)
-      } catch {
-        case e: Exception =>
-          ProxyThreadUtils.shutdown(service)
-          throw e
-      }
+      executeParallelBuildSnapshot(ss, toBuildTableDesc, baseDir, snapSizeMap, 
fs, snapshotParallelBuildTimeoutSeconds)
     } else {
-      toBuildTableDesc.foreach(tableDesc => {
-        if (stepCheckpoint.exists(_.canSkip(tableDesc))) {
-          logInfo(s"Skip snapshot ${tableDesc.getIdentity}")
-        } else {
-          buildSingleSnapshot(ss, tableDesc, baseDir, fs, snapSizeMap)
-          // do step checkpoint
-          stepCheckpoint.map(_.checkpoint(tableDesc))
-        }
-      })
+      executeSerialBuildSnapshot(ss, toBuildTableDesc, baseDir, snapSizeMap, 
fs, stepCheckpoint)
     }
     snapSizeMap
   }
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
index 069ac171cc..77b8834f02 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
@@ -18,6 +18,7 @@
 
 package org.apache.spark.application
 
+
 import java.util.concurrent.Executors
 
 import org.apache.kylin.engine.spark.application.SparkApplication
@@ -48,7 +49,6 @@ class JobWorker(application: SparkApplication, args: 
Array[String], eventLoop: K
     execute()
   }
 
-
   private def execute(): Unit = {
     pool.execute(new Runnable {
       override def run(): Unit = {
@@ -56,6 +56,12 @@ class JobWorker(application: SparkApplication, args: 
Array[String], eventLoop: K
           application.execute(args)
           eventLoop.post(JobSucceeded())
         } catch {
+          // Compatible with runtime exceptions thrown by the 
SparkApplication.execute(args: Array[String])
+          case runtimeException: RuntimeException =>
+            runtimeException.getCause match {
+              case noRetryException: NoRetryException => 
eventLoop.post(UnknownThrowable(noRetryException))
+              case throwable: Throwable => 
eventLoop.post(ResourceLack(throwable))
+            }
           case exception: NoRetryException => 
eventLoop.post(UnknownThrowable(exception))
           case throwable: Throwable => eventLoop.post(ResourceLack(throwable))
         }
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala
index b9a1ff87c2..1864d8749c 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala
@@ -22,6 +22,7 @@ import java.util
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicBoolean
 import com.amazonaws.services.s3.model.AmazonS3Exception
+import org.apache.hadoop.security.AccessControlException
 import org.apache.kylin.cluster.{AvailableResource, IClusterManager, 
ResourceInfo}
 import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.engine.spark.job.KylinBuildEnv
@@ -307,7 +308,6 @@ class TestJobMonitor extends SparderBaseFunSuite with 
BeforeAndAfterEach {
     }
   }
 
-
   test("post JobFailed event when receive class not found event") {
     withEventLoop { eventLoop =>
       Mockito.when(config.getSparkEngineMaxRetryTime).thenReturn(1)
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
index 73035a24c3..c1cfa95387 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
@@ -18,9 +18,10 @@
 
 package org.apache.spark.application
 
-import java.util.concurrent.CountDownLatch
-import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.hadoop.security.AccessControlException
 
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.atomic.AtomicBoolean
 import org.apache.kylin.engine.spark.application.SparkApplication
 import org.apache.kylin.engine.spark.scheduler._
 import org.apache.spark.scheduler.KylinJobEventLoop
@@ -53,6 +54,54 @@ class TestJobWorker extends SparderBaseFunSuite with 
BeforeAndAfter {
     eventLoop.stop()
   }
 
+  test("post ResourceLack event when job failed for 
non-AccessControlException") {
+    val eventLoop = new KylinJobEventLoop
+    eventLoop.start()
+    val worker = new JobWorker(new 
ResourceLackJobWithNonAccessControlException(), Array.empty, eventLoop)
+    val latch = new CountDownLatch(2)
+    val receiveResourceLack = new AtomicBoolean(false)
+    val listener = new KylinJobListener {
+      override def onReceive(event: KylinJobEvent): Unit = {
+        if (event.isInstanceOf[ResourceLack]) {
+          receiveResourceLack.getAndSet(true)
+        }
+        latch.countDown()
+      }
+    }
+    eventLoop.registerListener(listener)
+    eventLoop.post(RunJob())
+    // receive RunJob and ResourceLack
+    latch.await(30, TimeUnit.SECONDS)
+    assert(receiveResourceLack.get())
+    eventLoop.unregisterListener(listener)
+    worker.stop()
+    eventLoop.stop()
+  }
+
+  test("post ResourceLack event when job failed with runtime exception for 
lack of resource") {
+    val eventLoop = new KylinJobEventLoop
+    eventLoop.start()
+    val worker = new JobWorker(new ResourceLackJobWithRuntimeException(), 
Array.empty, eventLoop)
+    val latch = new CountDownLatch(2)
+    val receiveResourceLack = new AtomicBoolean(false)
+    val listener = new KylinJobListener {
+      override def onReceive(event: KylinJobEvent): Unit = {
+        if (event.isInstanceOf[ResourceLack]) {
+          receiveResourceLack.getAndSet(true)
+        }
+        latch.countDown()
+      }
+    }
+    eventLoop.registerListener(listener)
+    eventLoop.post(RunJob())
+    // receive RunJob and ResourceLack
+    latch.await(30, TimeUnit.SECONDS)
+    assert(receiveResourceLack.get())
+    eventLoop.unregisterListener(listener)
+    worker.stop()
+    eventLoop.stop()
+  }
+
   test("post JobSucceeded event when job succeeded") {
     val eventLoop = new KylinJobEventLoop
     eventLoop.start()
@@ -100,8 +149,153 @@ class TestJobWorker extends SparderBaseFunSuite with 
BeforeAndAfter {
     worker.stop()
     eventLoop.stop()
   }
+
+  test("post Permission denied event when PermissionDenied occurred with 
handle Exception function") {
+    val eventLoop = new KylinJobEventLoop
+    eventLoop.start()
+    val worker = new JobWorker(new InterceptPermissionDeniedJob(), 
Array.empty, eventLoop)
+    val latch = new CountDownLatch(2)
+    val receivePermissionDenied = new AtomicBoolean(false)
+    val listener = new KylinJobListener {
+      override def onReceive(event: KylinJobEvent): Unit = {
+        if (event.isInstanceOf[UnknownThrowable]) {
+          receivePermissionDenied.getAndSet(true)
+        }
+        latch.countDown()
+      }
+    }
+    eventLoop.registerListener(listener)
+    eventLoop.post(RunJob())
+    // receive RunJob and PermissionDenied
+    latch.await(30, TimeUnit.SECONDS)
+    assert(receivePermissionDenied.get())
+    eventLoop.unregisterListener(listener)
+    worker.stop()
+    eventLoop.stop()
+  }
+
+  test("post Permission denied event when PermissionDenied occurred with 
interceptAccessControlException function") {
+    val eventLoop = new KylinJobEventLoop
+    eventLoop.start()
+    val worker = new JobWorker(new 
HandlePermissionDeniedJobWithAccessControlException(), Array.empty, eventLoop)
+    val latch = new CountDownLatch(2)
+    val receivePermissionDenied = new AtomicBoolean(false)
+    val listener = new KylinJobListener {
+      override def onReceive(event: KylinJobEvent): Unit = {
+        if (event.isInstanceOf[UnknownThrowable]) {
+          receivePermissionDenied.getAndSet(true)
+        }
+        latch.countDown()
+      }
+    }
+    eventLoop.registerListener(listener)
+    eventLoop.post(RunJob())
+    // receive RunJob and PermissionDenied
+    latch.await(30, TimeUnit.SECONDS)
+    assert(receivePermissionDenied.get())
+    eventLoop.unregisterListener(listener)
+    worker.stop()
+    eventLoop.stop()
+  }
+
+  test("post Permission denied event when PermissionDenied occurred with 
RuntimeException wraped") {
+    val eventLoop = new KylinJobEventLoop
+    eventLoop.start()
+    val worker = new JobWorker(new 
HandlePermissionDeniedJobWithRuntimeExceptionWraped(), Array.empty, eventLoop)
+    val latch = new CountDownLatch(2)
+    val receivePermissionDenied = new AtomicBoolean(false)
+    val listener = new KylinJobListener {
+      override def onReceive(event: KylinJobEvent): Unit = {
+        if (event.isInstanceOf[UnknownThrowable]) {
+          receivePermissionDenied.getAndSet(true)
+        }
+        latch.countDown()
+      }
+    }
+    eventLoop.registerListener(listener)
+    eventLoop.post(RunJob())
+    // receive RunJob and PermissionDenied
+    latch.await(30, TimeUnit.SECONDS)
+    assert(receivePermissionDenied.get())
+    eventLoop.unregisterListener(listener)
+    worker.stop()
+    eventLoop.stop()
+  }
+
+  test("post ResourceLack event when job failed for lack of resource with 
RuntimeException wraped") {
+    val eventLoop = new KylinJobEventLoop
+    eventLoop.start()
+    val worker = new JobWorker(new 
HandleResourceLackJobWithRuntimeExceptionWraped(), Array.empty, eventLoop)
+    val latch = new CountDownLatch(2)
+    val receivePermissionDenied = new AtomicBoolean(false)
+    val listener = new KylinJobListener {
+      override def onReceive(event: KylinJobEvent): Unit = {
+        if (event.isInstanceOf[ResourceLack]) {
+          receivePermissionDenied.getAndSet(true)
+        }
+        latch.countDown()
+      }
+    }
+    eventLoop.registerListener(listener)
+    eventLoop.post(RunJob())
+    // receive RunJob and PermissionDenied
+    latch.await(30, TimeUnit.SECONDS)
+    assert(receivePermissionDenied.get())
+    eventLoop.unregisterListener(listener)
+    worker.stop()
+    eventLoop.stop()
+  }
+
+  test("post Permission denied event when RuntimeException occurred") {
+    val eventLoop = new KylinJobEventLoop
+    eventLoop.start()
+    val worker = new JobWorker(new 
PermissionDeniedJobWithRuntimeExceptionWarped(), Array.empty, eventLoop)
+    val latch = new CountDownLatch(2)
+    val receivePermissionDenied = new AtomicBoolean(false)
+    val listener = new KylinJobListener {
+      override def onReceive(event: KylinJobEvent): Unit = {
+        if (event.isInstanceOf[UnknownThrowable]) {
+          receivePermissionDenied.getAndSet(true)
+        }
+        latch.countDown()
+      }
+    }
+    eventLoop.registerListener(listener)
+    eventLoop.post(RunJob())
+    // receive RunJob and PermissionDenied
+    latch.await(30, TimeUnit.SECONDS)
+    assert(receivePermissionDenied.get())
+    eventLoop.unregisterListener(listener)
+    worker.stop()
+    eventLoop.stop()
+  }
+  test("post Permission denied event when AccessControlException occurred") {
+    val eventLoop = new KylinJobEventLoop
+    eventLoop.start()
+    val worker = new JobWorker(new PermissionDeniedJobWithNoRetryException(), 
Array.empty, eventLoop)
+    val latch = new CountDownLatch(2)
+    val receivePermissionDenied = new AtomicBoolean(false)
+    val listener = new KylinJobListener {
+      override def onReceive(event: KylinJobEvent): Unit = {
+        if (event.isInstanceOf[UnknownThrowable]) {
+          receivePermissionDenied.getAndSet(true)
+        }
+        latch.countDown()
+      }
+    }
+    eventLoop.registerListener(listener)
+    eventLoop.post(RunJob())
+    // receive RunJob and PermissionDenied
+    latch.await(30, TimeUnit.SECONDS)
+    assert(receivePermissionDenied.get())
+    eventLoop.unregisterListener(listener)
+    worker.stop()
+    eventLoop.stop()
+  }
 }
 
+
+
 class UnknownThrowableJob extends SparkApplication {
   override def execute(args: Array[String]): Unit = {
     throw new NoRetryException()
@@ -110,6 +304,81 @@ class UnknownThrowableJob extends SparkApplication {
   override protected def doExecute(): Unit = {}
 }
 
+class InterceptPermissionDeniedJob extends SparkApplication {
+  override def execute(args: Array[String]): Unit = {
+    try {
+      throw new AccessControlException()
+    } catch {
+      case e : AccessControlException =>
+        interceptAccessControlException(e)
+    }
+  }
+  override protected def doExecute(): Unit = {}
+}
+
+class HandlePermissionDeniedJobWithAccessControlException extends 
SparkApplication {
+  override def execute(args: Array[String]): Unit = {
+    try {
+      throw new AccessControlException()
+    } catch {
+      case e: Exception => handleException(e)
+    }
+  }
+  override protected def doExecute(): Unit = {}
+}
+
+class ResourceLackJobWithNonAccessControlException extends SparkApplication {
+  override def execute(args: Array[String]): Unit = {
+    try {
+      throw new Exception
+    } catch {
+      case e: Exception => handleException(e)
+    }
+  }
+  override protected def doExecute(): Unit = {}
+}
+
+class HandlePermissionDeniedJobWithRuntimeExceptionWraped extends 
SparkApplication {
+  override def execute(args: Array[String]): Unit = {
+    try {
+      throw new AccessControlException()
+    } catch {
+      case e: Exception => handleException(new 
RuntimeException("PermissionDenied", e))
+    }
+  }
+  override protected def doExecute(): Unit = {}
+}
+
+class HandleResourceLackJobWithRuntimeExceptionWraped extends SparkApplication 
{
+  override def execute(args: Array[String]): Unit = {
+    try {
+      throw new Exception()
+    } catch {
+      case e: Exception => handleException(new RuntimeException("Resource 
Lack", e))
+    }
+  }
+  override protected def doExecute(): Unit = {}
+}
+
+
+class PermissionDeniedJobWithRuntimeExceptionWarped extends SparkApplication {
+  override def execute(args: Array[String]): Unit = {
+    try {
+      throw new AccessControlException()
+    } catch {
+      case e : Exception => throw new RuntimeException("Error execute " + 
this.getClass.getName, new NoRetryException("Permission denied."))
+    }
+  }
+  override protected def doExecute(): Unit = {}
+}
+
+class PermissionDeniedJobWithNoRetryException extends SparkApplication {
+  override def execute(args: Array[String]): Unit = {
+      throw new NoRetryException("Permission Denied")
+  }
+  override protected def doExecute(): Unit = {}
+}
+
 class ResourceLackJob extends SparkApplication {
 
   override def execute(args: Array[String]): Unit = {
@@ -119,6 +388,19 @@ class ResourceLackJob extends SparkApplication {
   override protected def doExecute(): Unit = {}
 }
 
+class ResourceLackJobWithRuntimeException extends SparkApplication {
+
+  override def execute(args: Array[String]): Unit = {
+    try {
+      throw new Exception()
+    } catch {
+      case e: Exception => throw new RuntimeException("Error execute " + 
this.getClass.getName, e)
+    }
+  }
+
+  override protected def doExecute(): Unit = {}
+}
+
 class MockSucceedJob extends SparkApplication {
   override def execute(args: Array[String]): Unit = {}
 

Reply via email to