Copilot commented on code in PR #6528:
URL: https://github.com/apache/hive/pull/6528#discussion_r3428417585
##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java:
##########
@@ -353,6 +353,20 @@ void returnSession(TezSession tezSessionState) {
+ " belongs to the pool. Put it back in");
defaultSessionPool.returnSession((TezSessionPoolSession)tezSessionState);
}
+
+ if (useExternalSessions) {
+ if (tezSessionState.getTezClient() != null
+ && tezSessionState.getTezClient().getAppMasterApplicationId() !=
null) {
+ try {
+ tezSessionState.close(false);
Review Comment:
Calling `tezSessionState.close(false)` here runs after the session may
already have been returned to `defaultSessionPool` above, which can race with
another thread borrowing the same pooled session. It can also leave the session
eligible for reuse while its external session claim has been released, allowing
DAGs to be submitted against an AM that may now be concurrently assigned to
another HS2 instance.
##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java:
##########
@@ -181,4 +195,67 @@ public boolean killQuery(String reason) throws
HiveException {
killQuery.killQuery(queryId, reason, conf, false);
return true;
}
+
+ @Override
+ public DAGClient submitDAG(DAG dag) throws TezException, IOException {
+ try {
+ return getTezClient().submitDAG(dag);
+ } catch (TezException e) {
+ if (e.getMessage() == null || !e.getMessage().contains("App master
already running a DAG")) {
+ throw e;
+ }
+ tryKillRunningDAGs(getTezClient());
Review Comment:
Branching on a substring match of the TezException message ("App master
already running a DAG") is brittle across Tez versions/localization and can
cause missed detection (no cleanup) or false positives (killing unrelated DAGs)
if the wording changes.
##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java:
##########
@@ -499,6 +499,20 @@ public static void killRunningJobs() {
}
}
+ public static void killRunningDAGsForApplication(String applicationId) {
+ synchronized (shutdownList) {
+ for (DAGClient c : shutdownList) {
+ try {
+ if (applicationId.equals(c.getSessionIdentifierString())) {
+ c.tryKillDAG();
+ }
+ } catch (Exception e) {
+ LOG.error("Error while trying to kill running DAG on tez session
{}", applicationId);
+ }
Review Comment:
The exception is swallowed without logging the stack trace, which makes
diagnosing failures in DAG termination very difficult (and can hide repeated
kill failures).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]