This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.3.1
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.3.1 by this push:
new 1b94f4302 [Optimize] move to the end and add try catch block to ensure
execute (#3833)
1b94f4302 is described below
commit 1b94f43023ed4b5bcea7e3167c2063df69a61a16
Author: 人生有如两个橘子 <[email protected]>
AuthorDate: Wed Nov 16 15:06:29 2022 +0800
[Optimize] move to the end and add try catch block to ensure execute (#3833)
---
.../linkis/entrance/execute/EntranceJob.scala | 139 ++++++++++-----------
1 file changed, 68 insertions(+), 71 deletions(-)
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
index 88b035416..ae414c3c8 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/execute/EntranceJob.scala
@@ -135,95 +135,92 @@ abstract class EntranceJob extends Job {
fromState: SchedulerEventState,
toState: SchedulerEventState
): Unit = {
- super.afterStateChanged(fromState, toState)
- toState match {
- case Scheduled =>
- if (getJobRequest.getMetrics == null) {
- getLogListener.foreach(
- _.onLogUpdate(this, LogUtils.generateWarn("Job Metrics has not
been initialized."))
- )
- } else {
- if
(getJobRequest.getMetrics.containsKey(TaskConstant.ENTRANCEJOB_SCHEDULE_TIME)) {
+ try {
+ toState match {
+ case Scheduled =>
+ if (getJobRequest.getMetrics == null) {
getLogListener.foreach(
- _.onLogUpdate(
- this,
- LogUtils.generateWarn("Your job has already been scheduled
before.")
- )
+ _.onLogUpdate(this, LogUtils.generateWarn("Job Metrics has not
been initialized."))
)
} else {
- getJobRequest.getMetrics.put(
- TaskConstant.ENTRANCEJOB_SCHEDULE_TIME,
- new Date(System.currentTimeMillis)
- )
+ if
(getJobRequest.getMetrics.containsKey(TaskConstant.ENTRANCEJOB_SCHEDULE_TIME)) {
+ getLogListener.foreach(
+ _.onLogUpdate(
+ this,
+ LogUtils.generateWarn("Your job has already been scheduled
before.")
+ )
+ )
+ } else {
+ getJobRequest.getMetrics.put(
+ TaskConstant.ENTRANCEJOB_SCHEDULE_TIME,
+ new Date(System.currentTimeMillis)
+ )
+ }
}
- }
- getLogListener.foreach(
- _.onLogUpdate(
- this,
- LogUtils.generateInfo("Your job is Scheduled. Please wait it to
run.")
- )
- )
- case WaitForRetry =>
- getLogListener.foreach(
- _.onLogUpdate(
- this,
- LogUtils.generateInfo("Your job is turn to retry. Please wait it
to schedule.")
- )
- )
- case Running =>
- getLogListener.foreach(
- _.onLogUpdate(
- this,
- LogUtils.generateInfo("Your job is Running now. Please wait it to
complete.")
+ getLogListener.foreach(
+ _.onLogUpdate(
+ this,
+ LogUtils.generateInfo("Your job is Scheduled. Please wait it to
run.")
+ )
)
- )
-
- case _ if SchedulerEventState.isCompleted(toState) =>
- endTime = System.currentTimeMillis()
-
- getJobRequest.getMetrics.put(
- TaskConstant.ENTRANCEJOB_COMPLETE_TIME,
- new Date(System.currentTimeMillis())
- )
- if (getJobInfo != null) {
- getLogListener.foreach(_.onLogUpdate(this,
LogUtils.generateInfo(getJobInfo.getMetric)))
- }
- if (isSucceed) {
+ case WaitForRetry =>
getLogListener.foreach(
_.onLogUpdate(
this,
- LogUtils.generateInfo("Congratulations. Your job completed with
status Success.")
+ LogUtils.generateInfo("Your job is turn to retry. Please wait it
to schedule.")
)
)
- } else {
+ case Running =>
getLogListener.foreach(
_.onLogUpdate(
this,
- LogUtils.generateInfo(
- s"Sorry. Your job completed with a status $toState. You can
view logs for the reason."
- )
+ LogUtils.generateInfo("Your job is Running now. Please wait it
to complete.")
)
)
- }
- this.setProgress(EntranceJob.JOB_COMPLETED_PROGRESS)
- entranceListenerBus.foreach(
- _.post(
- EntranceProgressEvent(this, EntranceJob.JOB_COMPLETED_PROGRESS,
this.getProgressInfo)
+ case _ if SchedulerEventState.isCompleted(toState) =>
+ getJobRequest.getMetrics.put(
+ TaskConstant.ENTRANCEJOB_COMPLETE_TIME,
+ new Date(System.currentTimeMillis())
)
- )
- this.getProgressListener.foreach(listener =>
- listener.onProgressUpdate(
- this,
- EntranceJob.JOB_COMPLETED_PROGRESS,
- Array[JobProgressInfo]()
+ if (getJobInfo != null) {
+ getLogListener.foreach(_.onLogUpdate(this,
LogUtils.generateInfo(getJobInfo.getMetric)))
+ }
+ if (isSucceed) {
+ getLogListener.foreach(
+ _.onLogUpdate(
+ this,
+ LogUtils.generateInfo("Congratulations. Your job completed
with status Success.")
+ )
+ )
+ } else {
+ getLogListener.foreach(
+ _.onLogUpdate(
+ this,
+ LogUtils.generateInfo(
+ s"Sorry. Your job completed with a status $toState. You can
view logs for the reason."
+ )
+ )
+ )
+ }
+ this.setProgress(EntranceJob.JOB_COMPLETED_PROGRESS)
+ entranceListenerBus.foreach(
+ _.post(
+ EntranceProgressEvent(this, EntranceJob.JOB_COMPLETED_PROGRESS,
this.getProgressInfo)
+ )
)
- )
- getEntranceContext
- .getOrCreatePersistenceManager()
- .createPersistenceEngine()
- .updateIfNeeded(getJobRequest)
- case _ =>
+ this.getProgressListener.foreach(listener =>
+ listener.onProgressUpdate(
+ this,
+ EntranceJob.JOB_COMPLETED_PROGRESS,
+ Array[JobProgressInfo]()
+ )
+ )
+ case _ =>
+ }
+ } catch {
+ case e: Exception => logger.error("Failed to match state", e)
}
+ super.afterStateChanged(fromState, toState)
entranceListenerBus.foreach(_.post(EntranceJobEvent(this.getId())))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]