This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new f44e1beacb0 HIVE-28623: Implement RuntimeContext to decouple runtime
data from TezTask (#5538) (Laszlo Bodor reviewed by Dmitriy Fingerman, Denys
Kuzmenko)
f44e1beacb0 is described below
commit f44e1beacb0d8f115c2a0b6d8c88088537aa2e15
Author: Bodor Laszlo <[email protected]>
AuthorDate: Wed Dec 18 11:46:24 2024 +0100
HIVE-28623: Implement RuntimeContext to decouple runtime data from TezTask
(#5538) (Laszlo Bodor reviewed by Dmitriy Fingerman, Denys Kuzmenko)
---
.../org/apache/hadoop/hive/ql/DriverContext.java | 17 ++++
.../org/apache/hadoop/hive/ql/exec/Utilities.java | 5 +
.../hadoop/hive/ql/exec/tez/TezRuntimeContext.java | 108 +++++++++++++++++++++
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 40 +++++---
4 files changed, 157 insertions(+), 13 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
index 85a4723a42a..f059f41b0ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
@@ -28,6 +28,9 @@ import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache.CacheEntry;
import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.TezRuntimeContext;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.plan.mapper.StatsSource;
@@ -78,6 +81,8 @@ public class DriverContext {
private String operationId;
private String queryErrorMessage;
+ private TezRuntimeContext runtimeContext;
+
public DriverContext(QueryState queryState, QueryInfo queryInfo, HookRunner
hookRunner,
HiveTxnManager initTxnManager) {
this.queryState = queryState;
@@ -125,6 +130,18 @@ public class DriverContext {
public void setPlan(QueryPlan plan) {
this.plan = plan;
+ // only set runtimeContext if the plan is not null
+ // we don't want to nullify runtimeContext if this method is called with
plan=null, which is the case when e.g.
+ // driver.releasePlan() tries to release resources/objects that are known
to be heavy
+ if (plan != null) {
+ this.runtimeContext = Utilities.getFirstTezTask(plan.getRootTasks())
+ .map(TezTask::getRuntimeContext)
+ .orElse(null);
+ }
+ }
+
+ public TezRuntimeContext getRuntimeContext() {
+ return runtimeContext;
}
public Schema getSchema() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 4e9ced6728b..95167c3d383 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -31,6 +31,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.io.Serializable;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
@@ -2822,6 +2823,10 @@ public final class Utilities {
return getTasks(tasks, new TaskFilterFunction<>(TezTask.class));
}
+ public static Optional<TezTask> getFirstTezTask(List<Task<? extends
Serializable>> tasks) {
+ return getTezTasks(tasks).stream().findFirst();
+ }
+
public static List<ExecDriver> getMRTasks(List<Task<?>> tasks) {
return getTasks(tasks, new TaskFilterFunction<>(ExecDriver.class));
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezRuntimeContext.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezRuntimeContext.java
new file mode 100644
index 00000000000..d119f850a70
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezRuntimeContext.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounters;
+
+/**
+ * TezRuntimeContext is a class used by mainly TezTask to store runtime
information.
+ */
+public class TezRuntimeContext {
+ //package protected: it's fine to be visible within the tez related package
+ TezCounters counters;
+
+ // dag id of the running dag
+ private String dagId;
+ // tez application id
+ private String appId;
+ // tez session id
+ private String sessionId;
+ // address (host:port) of the AM
+ private String amAddress;
+ private TezJobMonitor monitor;
+ // llap/container
+ private String executionMode;
+
+ public void init(TezClient tezClient) {
+ this.amAddress = tezClient.getAmHost() + ":" + tezClient.getAmPort();
+ }
+
+ public TezCounters getCounters() {
+ return counters;
+ }
+
+ public void setCounters(TezCounters counters) {
+ this.counters = counters;
+ }
+
+ public String getDagId() {
+ return dagId;
+ }
+
+ public void setDagId(String dagId) {
+ this.dagId = dagId;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public void setSessionId(String sessionId) {
+ this.sessionId = sessionId;
+ }
+
+ public String getApplicationId() {
+ return appId;
+ }
+
+ public void setApplicationId(String appId) {
+ this.appId = appId;
+ }
+
+ public String getExecutionMode() {
+ return executionMode;
+ }
+
+ public void setExecutionMode(String executionMode) {
+ this.executionMode = executionMode;
+ }
+
+ public String getAmAddress() {
+ return amAddress;
+ }
+
+ public TezJobMonitor getMonitor() {
+ return monitor;
+ }
+
+ public void setMonitor(TezJobMonitor monitor) {
+ this.monitor = monitor;
+ }
+
+ public long getCounter(String groupName, String counterName) {
+ CounterGroup group = getCounters().getGroup(groupName);
+ if (group == null) {
+ return 0;
+ }
+ return group.findCounter(counterName, true).getValue();
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 2675ae61fa6..b11a414ca9a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -113,7 +113,7 @@ public class TezTask extends Task<TezWork> {
private final PerfLogger perfLogger = SessionState.getPerfLogger();
private static final String TEZ_MEMORY_RESERVE_FRACTION =
"tez.task.scale.memory.reserve-fraction";
- private TezCounters counters;
+ private final TezRuntimeContext runtimeContext = new TezRuntimeContext();
private final DagUtils utils;
@@ -134,11 +134,15 @@ public class TezTask extends Task<TezWork> {
}
public TezCounters getTezCounters() {
- return counters;
+ return runtimeContext.getCounters();
}
public void setTezCounters(final TezCounters counters) {
- this.counters = counters;
+ runtimeContext.setCounters(counters);
+ }
+
+ public TezRuntimeContext getRuntimeContext() {
+ return runtimeContext;
}
/**
@@ -196,6 +200,8 @@ public class TezTask extends Task<TezWork> {
ss.getHiveVariables().get("wmpool"),
ss.getHiveVariables().get("wmapp"));
WmContext wmContext = ctx.getWmContext();
+ String executionMode = HiveConf.getVar(conf,
HiveConf.ConfVars.HIVE_EXECUTION_MODE);
+ runtimeContext.setExecutionMode(executionMode);
// jobConf will hold all the configuration for hadoop, tez, and hive,
which are not set in AM defaults
JobConf jobConf = utils.createConfiguration(conf, false);
@@ -261,13 +267,19 @@ public class TezTask extends Task<TezWork> {
// Log all the info required to find the various logs for this query
String dagId = this.dagClient.getDagIdentifierString();
- LOG.info("HS2 Host: [{}], Query ID: [{}], Dag ID: [{}], DAG Session
ID: [{}]", ServerUtils.hostname(), queryId,
- dagId, this.dagClient.getSessionIdentifierString());
+ String appId = this.dagClient.getSessionIdentifierString();
+ LOG.info("HS2 Host: [{}], Query ID: [{}], Dag ID: [{}], DAG App ID:
[{}]", ServerUtils.hostname(), queryId,
+ dagId, appId);
LogUtils.putToMDC(LogUtils.DAGID_KEY, dagId);
this.jobID = dagId;
+ runtimeContext.setDagId(dagId);
+ runtimeContext.setSessionId(session.getSessionId());
+ runtimeContext.setApplicationId(appId);
// finally monitor will print progress until the job is done
- TezJobMonitor monitor = new TezJobMonitor(work.getAllWork(),
dagClient, conf, dag, ctx, counters, perfLogger);
+ TezJobMonitor monitor = new TezJobMonitor(work.getAllWork(),
dagClient, conf, dag, ctx, runtimeContext.counters,
+ perfLogger);
+ runtimeContext.setMonitor(monitor);
rc = monitor.monitorExecution();
if (rc != 0) {
@@ -283,12 +295,13 @@ public class TezTask extends Task<TezWork> {
TezCounters dagCounters = dagStatus.getDAGCounters();
// if initial counters exists, merge it with dag counters to get
aggregated view
- TezCounters mergedCounters = counters == null ? dagCounters :
Utils.mergeTezCounters(dagCounters, counters);
- counters = mergedCounters;
+ TezCounters mergedCounters = runtimeContext.counters == null ?
dagCounters : Utils.mergeTezCounters(
+ dagCounters, runtimeContext.counters);
+ runtimeContext.counters = mergedCounters;
} catch (Exception err) {
// Don't fail execution due to counters - just don't print summary
info
LOG.warn("Failed to get counters. Ignoring, summary info will be
incomplete.", err);
- counters = null;
+ runtimeContext.counters = null;
}
// save useful commit information into query state, e.g. for custom
commit hooks, like Iceberg
@@ -320,10 +333,10 @@ public class TezTask extends Task<TezWork> {
}
}
- if (LOG.isInfoEnabled() && counters != null
+ if (LOG.isInfoEnabled() && runtimeContext.counters != null
&& (HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||
Utilities.isPerfOrAboveLogging(conf))) {
- for (CounterGroup group: counters) {
+ for (CounterGroup group : runtimeContext.counters) {
LOG.info(group.getDisplayName() +":");
for (TezCounter counter: group) {
LOG.info(" "+counter.getDisplayName()+": "+counter.getValue());
@@ -407,8 +420,8 @@ public class TezTask extends Task<TezWork> {
}
private void updateNumRows() {
- if (counters != null) {
- TezCounter counter = counters.findCounter(
+ if (runtimeContext.counters != null) {
+ TezCounter counter = runtimeContext.counters.findCounter(
conf.getVar(HiveConf.ConfVars.HIVE_COUNTER_GROUP),
FileSinkOperator.TOTAL_TABLE_ROWS_WRITTEN);
if (counter != null) {
queryState.setNumModifiedRows(counter.getValue());
@@ -671,6 +684,7 @@ public class TezTask extends Task<TezWork> {
}
perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG);
+ runtimeContext.init(sessionState.getSession());
return new SyncDagClient(dagClient);
}