This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new f44e1beacb0 HIVE-28623: Implement RuntimeContext to decouple runtime 
data from TezTask (#5538) (Laszlo Bodor reviewed by Dmitriy Fingerman, Denys 
Kuzmenko)
f44e1beacb0 is described below

commit f44e1beacb0d8f115c2a0b6d8c88088537aa2e15
Author: Bodor Laszlo <[email protected]>
AuthorDate: Wed Dec 18 11:46:24 2024 +0100

    HIVE-28623: Implement RuntimeContext to decouple runtime data from TezTask 
(#5538) (Laszlo Bodor reviewed by Dmitriy Fingerman, Denys Kuzmenko)
---
 .../org/apache/hadoop/hive/ql/DriverContext.java   |  17 ++++
 .../org/apache/hadoop/hive/ql/exec/Utilities.java  |   5 +
 .../hadoop/hive/ql/exec/tez/TezRuntimeContext.java | 108 +++++++++++++++++++++
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java    |  40 +++++---
 4 files changed, 157 insertions(+), 13 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java 
b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
index 85a4723a42a..f059f41b0ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
@@ -28,6 +28,9 @@ import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
 import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.TezRuntimeContext;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.plan.mapper.StatsSource;
 
@@ -78,6 +81,8 @@ public class DriverContext {
   private String operationId;
   private String queryErrorMessage;
 
+  private TezRuntimeContext runtimeContext;
+
   public DriverContext(QueryState queryState, QueryInfo queryInfo, HookRunner 
hookRunner,
       HiveTxnManager initTxnManager) {
     this.queryState = queryState;
@@ -125,6 +130,18 @@ public class DriverContext {
 
   public void setPlan(QueryPlan plan) {
     this.plan = plan;
+    // only set runtimeContext if the plan is not null
+    // we don't want to nullify runtimeContext if this method is called with 
plan=null, which is the case when e.g.
+    // driver.releasePlan() tries to release resources/objects that are known 
to be heavy
+    if (plan != null) {
+      this.runtimeContext = Utilities.getFirstTezTask(plan.getRootTasks())
+          .map(TezTask::getRuntimeContext)
+          .orElse(null);
+    }
+  }
+
+  public TezRuntimeContext getRuntimeContext() {
+    return runtimeContext;
   }
 
   public Schema getSchema() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 4e9ced6728b..95167c3d383 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -31,6 +31,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.Serializable;
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -2822,6 +2823,10 @@ public final class Utilities {
     return getTasks(tasks, new TaskFilterFunction<>(TezTask.class));
   }
 
+  public static Optional<TezTask> getFirstTezTask(List<Task<? extends 
Serializable>> tasks) {
+    return getTezTasks(tasks).stream().findFirst();
+  }
+
   public static List<ExecDriver> getMRTasks(List<Task<?>> tasks) {
     return getTasks(tasks, new TaskFilterFunction<>(ExecDriver.class));
   }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezRuntimeContext.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezRuntimeContext.java
new file mode 100644
index 00000000000..d119f850a70
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezRuntimeContext.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounters;
+
+/**
+ * TezRuntimeContext is a class used by mainly TezTask to store runtime 
information.
+ */
+public class TezRuntimeContext {
+  //package protected: it's fine to be visible within the tez related package
+  TezCounters counters;
+
+  // dag id of the running dag
+  private String dagId;
+  // tez application id
+  private String appId;
+  // tez session id
+  private String sessionId;
+  // address (host:port) of the AM
+  private String amAddress;
+  private TezJobMonitor monitor;
+  // llap/container
+  private String executionMode;
+
+  public void init(TezClient tezClient) {
+    this.amAddress = tezClient.getAmHost() + ":" + tezClient.getAmPort();
+  }
+
+  public TezCounters getCounters() {
+    return counters;
+  }
+
+  public void setCounters(TezCounters counters) {
+    this.counters = counters;
+  }
+
+  public String getDagId() {
+    return dagId;
+  }
+
+  public void setDagId(String dagId) {
+    this.dagId = dagId;
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+
+  public void setSessionId(String sessionId) {
+    this.sessionId = sessionId;
+  }
+
+  public String getApplicationId() {
+    return appId;
+  }
+
+  public void setApplicationId(String appId) {
+    this.appId = appId;
+  }
+
+  public String getExecutionMode() {
+    return executionMode;
+  }
+
+  public void setExecutionMode(String executionMode) {
+    this.executionMode = executionMode;
+  }
+
+  public String getAmAddress() {
+    return amAddress;
+  }
+
+  public TezJobMonitor getMonitor() {
+    return monitor;
+  }
+
+  public void setMonitor(TezJobMonitor monitor) {
+    this.monitor = monitor;
+  }
+
+  public long getCounter(String groupName, String counterName) {
+    CounterGroup group = getCounters().getGroup(groupName);
+    if (group == null) {
+      return 0;
+    }
+    return group.findCounter(counterName, true).getValue();
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 2675ae61fa6..b11a414ca9a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -113,7 +113,7 @@ public class TezTask extends Task<TezWork> {
   private final PerfLogger perfLogger = SessionState.getPerfLogger();
   private static final String TEZ_MEMORY_RESERVE_FRACTION = 
"tez.task.scale.memory.reserve-fraction";
 
-  private TezCounters counters;
+  private final TezRuntimeContext runtimeContext = new TezRuntimeContext();
 
   private final DagUtils utils;
 
@@ -134,11 +134,15 @@ public class TezTask extends Task<TezWork> {
   }
 
   public TezCounters getTezCounters() {
-    return counters;
+    return runtimeContext.getCounters();
   }
 
   public void setTezCounters(final TezCounters counters) {
-    this.counters = counters;
+    runtimeContext.setCounters(counters);
+  }
+
+  public TezRuntimeContext getRuntimeContext() {
+    return runtimeContext;
   }
 
   /**
@@ -196,6 +200,8 @@ public class TezTask extends Task<TezWork> {
           ss.getHiveVariables().get("wmpool"), 
ss.getHiveVariables().get("wmapp"));
 
       WmContext wmContext = ctx.getWmContext();
+      String executionMode = HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_EXECUTION_MODE);
+      runtimeContext.setExecutionMode(executionMode);
       // jobConf will hold all the configuration for hadoop, tez, and hive, 
which are not set in AM defaults
       JobConf jobConf = utils.createConfiguration(conf, false);
 
@@ -261,13 +267,19 @@ public class TezTask extends Task<TezWork> {
 
         // Log all the info required to find the various logs for this query
         String dagId = this.dagClient.getDagIdentifierString();
-        LOG.info("HS2 Host: [{}], Query ID: [{}], Dag ID: [{}], DAG Session 
ID: [{}]", ServerUtils.hostname(), queryId,
-            dagId, this.dagClient.getSessionIdentifierString());
+        String appId = this.dagClient.getSessionIdentifierString();
+        LOG.info("HS2 Host: [{}], Query ID: [{}], Dag ID: [{}], DAG App ID: 
[{}]", ServerUtils.hostname(), queryId,
+            dagId, appId);
         LogUtils.putToMDC(LogUtils.DAGID_KEY, dagId);
         this.jobID = dagId;
+        runtimeContext.setDagId(dagId);
+        runtimeContext.setSessionId(session.getSessionId());
+        runtimeContext.setApplicationId(appId);
 
         // finally monitor will print progress until the job is done
-        TezJobMonitor monitor = new TezJobMonitor(work.getAllWork(), 
dagClient, conf, dag, ctx, counters, perfLogger);
+        TezJobMonitor monitor = new TezJobMonitor(work.getAllWork(), 
dagClient, conf, dag, ctx, runtimeContext.counters,
+            perfLogger);
+        runtimeContext.setMonitor(monitor);
         rc = monitor.monitorExecution();
 
         if (rc != 0) {
@@ -283,12 +295,13 @@ public class TezTask extends Task<TezWork> {
           TezCounters dagCounters = dagStatus.getDAGCounters();
 
           // if initial counters exists, merge it with dag counters to get 
aggregated view
-          TezCounters mergedCounters = counters == null ? dagCounters : 
Utils.mergeTezCounters(dagCounters, counters);
-          counters = mergedCounters;
+          TezCounters mergedCounters = runtimeContext.counters == null ? 
dagCounters : Utils.mergeTezCounters(
+              dagCounters, runtimeContext.counters);
+          runtimeContext.counters = mergedCounters;
         } catch (Exception err) {
           // Don't fail execution due to counters - just don't print summary 
info
           LOG.warn("Failed to get counters. Ignoring, summary info will be 
incomplete.", err);
-          counters = null;
+          runtimeContext.counters = null;
         }
 
         // save useful commit information into query state, e.g. for custom 
commit hooks, like Iceberg
@@ -320,10 +333,10 @@ public class TezTask extends Task<TezWork> {
         }
       }
 
-      if (LOG.isInfoEnabled() && counters != null
+      if (LOG.isInfoEnabled() && runtimeContext.counters != null
           && (HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
           Utilities.isPerfOrAboveLogging(conf))) {
-        for (CounterGroup group: counters) {
+        for (CounterGroup group : runtimeContext.counters) {
           LOG.info(group.getDisplayName() +":");
           for (TezCounter counter: group) {
             LOG.info("   "+counter.getDisplayName()+": "+counter.getValue());
@@ -407,8 +420,8 @@ public class TezTask extends Task<TezWork> {
   }
 
   private void updateNumRows() {
-    if (counters != null) {
-      TezCounter counter = counters.findCounter(
+    if (runtimeContext.counters != null) {
+      TezCounter counter = runtimeContext.counters.findCounter(
         conf.getVar(HiveConf.ConfVars.HIVE_COUNTER_GROUP), 
FileSinkOperator.TOTAL_TABLE_ROWS_WRITTEN);
       if (counter != null) {
         queryState.setNumModifiedRows(counter.getValue());
@@ -671,6 +684,7 @@ public class TezTask extends Task<TezWork> {
     }
 
     perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
+    runtimeContext.init(sessionState.getSession());
     return new SyncDagClient(dagClient);
   }
 

Reply via email to