tanishq-chugh commented on code in PR #6528:
URL: https://github.com/apache/hive/pull/6528#discussion_r3442196878
##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java:
##########
@@ -146,23 +267,38 @@ public void childEvent(final CuratorFramework client,
final PathChildrenCacheEve
synchronized (lock) {
switch (event.getType()) {
- case CHILD_UPDATED, CHILD_ADDED:
- if (available.contains(applicationId) ||
taken.contains(applicationId)) {
- return; // We do not expect updates to existing sessions; ignore
them for now.
- }
- available.add(applicationId);
- break;
- case CHILD_REMOVED:
- if (taken.remove(applicationId)) {
- LOG.warn("The session in use has disappeared from the registry
({})", applicationId);
- } else if (!available.remove(applicationId)) {
- LOG.warn("An unknown session has been removed ({})",
applicationId);
- }
- break;
- default:
- // Ignore all the other events; logged above.
+ case CHILD_UPDATED, CHILD_ADDED:
+ if (available.contains(applicationId) ||
taken.contains(applicationId)) {
+ return; // We do not expect updates to existing sessions; ignore
them for now.
+ }
+ if (claimsCache.get(claimsPath + PATH_SEPARATOR +
applicationId).isPresent()) {
+ LOG.info("Ignoring newly added AM {} because it is already claimed
by another session.", applicationId);
Review Comment:
Addressed in
[cb98266](https://github.com/apache/hive/pull/6528/commits/cb982660acd6387477f6c49cca1a075ba3b5cb85)
##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java:
##########
@@ -59,25 +70,92 @@ public ZookeeperExternalSessionsRegistryClient(final
HiveConf initConf) {
}
private static String getApplicationId(final ChildData childData) {
- return childData.getPath().substring(childData.getPath().lastIndexOf("/")
+ 1);
+ return
childData.getPath().substring(childData.getPath().lastIndexOf(PATH_SEPARATOR) +
1);
}
private void init() {
String zkServer = HiveConf.getVar(initConf,
ConfVars.HIVE_ZOOKEEPER_QUORUM);
+ int sessionTimeoutMs = (int) HiveConf.getTimeVar(initConf,
ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+ int connectionTimeoutMs = (int) HiveConf.getTimeVar(initConf,
ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+ int baseSleepTimeMs = (int) HiveConf.getTimeVar(initConf,
ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+ TimeUnit.MILLISECONDS);
+ int maxRetries = HiveConf.getIntVar(initConf,
ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
String zkNamespace = HiveConf.getVar(initConf,
ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE);
String effectivePath = normalizeZkPath(zkNamespace);
- CuratorFramework client = CuratorFrameworkFactory.newClient(zkServer, new
ExponentialBackoffRetry(1000, 3));
+ this.claimsPath = effectivePath + "-claims";
+ // After connection state changes to SUSPENDED, the client has already
consumed ~2/3 of the negotiated session
+ // timeout. Use 33% of the remaining window so LOST aligns with when the
ZK server expires the session and drops
+ // ephemeral claim nodes. For Ref: Curator TN14
+ this.client = CuratorFrameworkFactory.builder()
+ .connectString(zkServer)
+ .sessionTimeoutMs(sessionTimeoutMs)
+ .connectionTimeoutMs(connectionTimeoutMs)
+ .simulatedSessionExpirationPercent(33)
+ .retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries))
+ .build();
+
synchronized (lock) {
client.start();
+
+ client.getConnectionStateListenable().addListener((curatorClient,
newState) -> {
+ if (newState == ConnectionState.CONNECTED || newState ==
ConnectionState.RECONNECTED) {
+ zkConnectionHealthy = true;
+ } else if (newState == ConnectionState.LOST) {
+ zkConnectionHealthy = false;
+ Set<String> sessionsToKill;
+ synchronized (lock) {
+ LOG.error("ZK connection state has changed to lost; killing
running DAGs on claimed AMs: {}", taken);
+ sessionsToKill = new HashSet<>(taken);
+ taken.clear();
+ }
+ sessionsToKill.forEach(TezJobMonitor::killRunningDAGsForApplication);
+ }
+ });
Review Comment:
Addressed in
[cb98266](https://github.com/apache/hive/pull/6528/commits/cb982660acd6387477f6c49cca1a075ba3b5cb85)
##########
ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java:
##########
@@ -128,5 +138,141 @@ public void testReuseSameSession() throws Exception {
}
}
}
+
+ /**
+ * Tests that multiple registry clients (simulating multiple HS2 instances)
+ * respect the global distributed lock (claims) and do not claim the same
session simultaneously.
+ */
+ @Test
+ public void testSessionClaimsFromDifferentRegistryClients() throws Exception
{
+ CuratorFramework client = null;
+ ZookeeperExternalSessionsRegistryClient registry1 = null;
+ ZookeeperExternalSessionsRegistryClient registry2 = null;
+
+ try (TestingServer server = new TestingServer()) {
+ String connectString = server.getConnectString();
+
+ HiveConf conf = new HiveConf();
+ conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString);
+ conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE,
"/tez_ns_concurrent");
+
Review Comment:
Addressed in
[cb98266](https://github.com/apache/hive/pull/6528/commits/cb982660acd6387477f6c49cca1a075ba3b5cb85)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]