This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git

commit 521d30ccd25f611226f498324dec07591a44ed2a
Author: peacewong <[email protected]>
AuthorDate: Tue Oct 10 20:34:51 2023 +0800

    optimize hive task progress
---
 .../hive/src/main/resources/log4j2.xml             | 11 +++++-
 .../hive/executor/HiveEngineConnExecutor.scala     | 43 ++++++++++++++++------
 2 files changed, 41 insertions(+), 13 deletions(-)

diff --git a/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml 
b/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
index 4d3b5855b..b56efdb36 100644
--- a/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
+++ b/linkis-engineconn-plugins/hive/src/main/resources/log4j2.xml
@@ -34,6 +34,10 @@
 
         <Send name="Send" >
             <Filters>
+                <RegexFilter regex=".*Hive-on-MR is deprecated in Hive \d+ and 
may not be available in the future versions\..*" onMatch="DENY" 
onMismatch="NEUTRAL"/>
+                <RegexFilter regex=".*Hadoop command-line option parsing 
not.*" onMatch="DENY" onMismatch="NEUTRAL"/>
+                <RegexFilter regex="Group.*is deprecated. Use.*instead" 
onMatch="DENY" onMismatch="NEUTRAL"/>
+                <RegexFilter regex="Failed to get files with ID; using regular 
API.*" onMatch="DENY" onMismatch="NEUTRAL"/>
                 <ThresholdFilter level="WARN" onMatch="ACCEPT" 
onMismatch="DENY" />
             </Filters>
             <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] 
%logger{36} %L %M [JobId-%X{jobId}] - %msg%xEx%n"/>
@@ -97,8 +101,11 @@
         <logger name="org.apache.hadoop.mapreduce.Job" level="INFO" 
additivity="true">
             <appender-ref ref="YarnAppIdOutputFile"/>
         </logger>
-        <logger name="org.apache.tez.client.TezClient" level="INFO" 
additivity="true">
-            <appender-ref ref="YarnAppIdOutputFile"/>
+        <logger name="org.apache.hadoop.hive.conf.HiveConf" level="ERROR" 
additivity="true">
+            <appender-ref ref="Send"/>
+        </logger>
+        <logger name="org.apache.hadoop.mapreduce.split.JobSplitWriter" 
level="ERROR" additivity="true">
+            <appender-ref ref="Send"/>
         </logger>
    </loggers>
 </configuration>
diff --git 
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
 
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
index bf4eff0cb..abe32d76d 100644
--- 
a/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
+++ 
b/linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConnExecutor.scala
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.common.HiveInterruptUtils
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
 import org.apache.hadoop.hive.ql.{Driver, QueryPlan}
+import org.apache.hadoop.hive.ql.exec.Task.TaskState
 import org.apache.hadoop.hive.ql.exec.Utilities
 import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper
 import org.apache.hadoop.hive.ql.exec.tez.TezJobExecHelper
@@ -204,9 +205,11 @@ class HiveEngineConnExecutor(
             var compileRet = -1
             Utils.tryCatch {
               compileRet = driver.compile(realCode)
-              logger.info(s"driver compile realCode : ${realCode} finished, 
status : ${compileRet}")
+              logger.info(
+                s"driver compile realCode : \n ${realCode} \n finished, status 
: ${compileRet}"
+              )
               if (0 != compileRet) {
-                logger.warn(s"compile realCode : ${realCode} error status : 
${compileRet}")
+                logger.warn(s"compile realCode : \n ${realCode} \n error 
status : ${compileRet}")
                 throw HiveQueryFailedException(
                   COMPILE_HIVE_QUERY_ERROR.getErrorCode,
                   COMPILE_HIVE_QUERY_ERROR.getErrorDesc
@@ -469,17 +472,35 @@ class HiveEngineConnExecutor(
       val totalSQLs = engineExecutorContext.getTotalParagraph
       val currentSQL = engineExecutorContext.getCurrentParagraph
       val currentBegin = (currentSQL - 1) / totalSQLs.asInstanceOf[Float]
-      HadoopJobExecHelper.runningJobs synchronized {
-        HadoopJobExecHelper.runningJobs.asScala foreach { runningJob =>
-          val name = runningJob.getID.toString
-          val _progress = runningJob.reduceProgress() + 
runningJob.mapProgress()
-          singleSqlProgressMap.put(name, _progress / 2)
+      val finishedStage =
+        if (null != driver && null != driver.getPlan() && 
!driver.getPlan().getRootTasks.isEmpty) {
+          Utils.tryQuietly(
+            Utilities
+              .getMRTasks(driver.getPlan().getRootTasks)
+              .asScala
+              .count(task => task.isMapRedTask && task.getTaskState == 
TaskState.FINISHED)
+          )
+        } else {
+          0
         }
-      }
       var totalProgress: Float = 0.0f
+      if (!HadoopJobExecHelper.runningJobs.isEmpty) {
+        val runningJob = HadoopJobExecHelper.runningJobs.get(0)
+        val _progress = Utils.tryCatch(runningJob.reduceProgress() + 
runningJob.mapProgress()) {
+          case e: Exception =>
+            logger.info(s"Failed to get job(${runningJob.getJobName}) progress 
", e)
+            0.2f
+        }
+        if (!_progress.isNaN) {
+          totalProgress = _progress / 2
+        }
+      }
+      logger.info(
+        s"Running stage  progress is $totalProgress, and finished stage is 
$finishedStage"
+      )
       val hiveRunJobs = if (numberOfMRJobs <= 0) 1 else numberOfMRJobs
-      singleSqlProgressMap.asScala foreach { case (_name, _progress) =>
-        totalProgress += _progress
+      if (finishedStage <= hiveRunJobs) {
+        totalProgress = totalProgress + finishedStage
       }
       try {
         totalProgress = totalProgress / (hiveRunJobs * totalSQLs)
@@ -488,10 +509,10 @@ class HiveEngineConnExecutor(
         case _ => totalProgress = 0.0f
       }
 
-      logger.debug(s"hive progress is $totalProgress")
       val newProgress =
         if (totalProgress.isNaN || totalProgress.isInfinite) currentBegin
         else totalProgress + currentBegin
+      logger.info(s"Hive progress is $newProgress, and finished stage is 
$finishedStage")
       val oldProgress = 
ProgressUtils.getOldProgress(this.engineExecutorContext)
       if (newProgress < oldProgress) oldProgress
       else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to