ayushtkn commented on code in PR #6528:
URL: https://github.com/apache/hive/pull/6528#discussion_r3371510400


##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java:
##########
@@ -91,22 +110,47 @@ static String normalizeZkPath(String zkNamespace) {
 
   @Override
   public String getSession() throws Exception {
-    synchronized (lock) {
-      if (!isInitialized) {
-        init();
-      }
-      long endTimeNs = System.nanoTime() + (1000000000L * maxAttempts);
-      while (available.isEmpty() && ((endTimeNs - System.nanoTime()) > 0)) {
-        lock.wait(1000L);
+    if (!isInitialized) {
+      synchronized (lock) {
+        if (!isInitialized) {
+          init();
+        }
       }
-      Iterator<String> iter = available.iterator();
-      if (!iter.hasNext()) {
-        throw new IOException("Cannot get a session after " + maxAttempts + " 
attempts");
+    }
+    
+    long endTimeNs = System.nanoTime() + (1000000000L * maxAttempts);
+    long queueWaitTimeMs = Math.max(0, (endTimeNs - System.nanoTime()) / 
1000000L);
+    if (!globalQueue.acquire(queueWaitTimeMs, TimeUnit.MILLISECONDS)) {
+      throw new IOException("Cannot get a session (timed out in queue) after " 
+ maxAttempts + " seconds");
+    }
+
+    try {
+      synchronized (lock) {
+        while (System.nanoTime() < endTimeNs) {

Review Comment:
   this is wrong, known anti-patern, this will screw up if `endTimeNs` goes -ve 
due to `overflow`



##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java:
##########
@@ -117,18 +161,33 @@ public void returnSession(String appId) {
         throw new IllegalStateException("Not initialized");
       }
       if (!taken.remove(appId)) {
-        return; // Session has been removed from ZK.
+        return; // Session has already been removed from ZK.
       }
+
+      try {
+        client.delete().guaranteed().forPath(claimsPath + "/" + appId);
+      } catch (KeeperException.NoNodeException e) {
+        // If the claim Node has already been deleted, we can ignore it.

Review Comment:
   can you add a debug log here



##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java:
##########
@@ -165,4 +225,34 @@ public void childEvent(final CuratorFramework client, 
final PathChildrenCacheEve
       }
     }
   }
+
+  private final class ClaimsPathListener implements PathChildrenCacheListener {
+    @Override
+    public void childEvent(CuratorFramework client, PathChildrenCacheEvent 
event) {
+      ChildData childData = event.getData();
+      if (childData == null) {
+        return;
+      }
+
+      String applicationId = getApplicationId(childData);
+      synchronized (lock) {
+        switch (event.getType()) {
+          case CHILD_REMOVED:
+            if (!taken.contains(applicationId)) {
+              // if the claim node was released by this particular HS2 itself,
+              // it will be added back to the available list & locks are 
notified as part of returnSession()
+              available.add(applicationId);
+              lock.notifyAll();
+            }
+            break;
+          case CHILD_ADDED:
+            // A Tez AM was claimed by another HS2, so remove the AM from the 
available list of this particular HS2
+            available.remove(applicationId);
+            break;

Review Comment:
   currious about the connection events, are we sure, if we loose connection 
and then `CONNECTION_RECONNECTED` is sent, are we sure the cache will replay 
all the missed events and our state would be correct?
   
   Even more curios about the Connection Lost case
   The network was down longer than the session timeout. Zookeeper deleted all 
of your ephemeral claim nodes. If you don't handle LOST, your local taken set 
will think it still owns the Tez AMs, but other HiveServer2 instances will see 
the nodes disappear and claim them right out from under you



##########
ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java:
##########
@@ -128,5 +134,118 @@ public void testReuseSameSession() throws Exception {
       }
     }
   }
+
+  /**
+   * Tests that multiple registry clients (simulating multiple HS2 instances)
+   * respect the global distributed lock (claims) and do not claim the same 
session simultaneously.
+   */
+  @Test
+  public void testSessionClaimsFromDifferentRegistryClients() throws Exception 
{
+    CuratorFramework client = null;
+    ZookeeperExternalSessionsRegistryClient registry1 = null;
+    ZookeeperExternalSessionsRegistryClient registry2 = null;
+
+    try (TestingServer server = new TestingServer()) {
+      String connectString = server.getConnectString();
+
+      HiveConf conf = new HiveConf();
+      conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString);
+      conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, 
"/tez_ns_concurrent");
+      
conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS, 
5);
+
+      String namespace = HiveConf.getVar(conf, 
ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE);
+      String effectivePath = 
ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace);
+
+      CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder();
+      client = builder.connectString(connectString).retryPolicy(new 
RetryOneTime(1)).build();
+      client.start();
+
+      client.create().creatingParentsIfNeeded().forPath(effectivePath + 
"/app_1");
+      client.create().forPath(effectivePath + "/app_2");
+
+      registry1 = new ZookeeperExternalSessionsRegistryClient(conf);
+      registry2 = new ZookeeperExternalSessionsRegistryClient(conf);
+
+      String sessionFromRegistry1 = registry1.getSession();
+      String sessionFromRegistry2 = registry2.getSession();
+
+      assertNotNull("Registry 1 should have claimed a session", 
sessionFromRegistry1);
+      assertNotNull("Registry 2 should have claimed a session", 
sessionFromRegistry2);
+
+      assertNotEquals("The two registries should claim different sessions!",
+          sessionFromRegistry1, sessionFromRegistry2);
+
+      registry1.returnSession(sessionFromRegistry1);
+
+      String session3FromRegistry2 = registry2.getSession();
+      assertEquals("Registry 2 should be able to claim the newly released 
session",
+          sessionFromRegistry1, session3FromRegistry2);
+
+      registry2.returnSession(sessionFromRegistry2);
+      registry2.returnSession(session3FromRegistry2);
+    } finally {
+      if (registry1 != null) {
+        registry1.close();
+      }
+      if (registry2 != null) {
+        registry2.close();
+      }
+      if (client != null) {
+        client.close();
+      }
+    }
+  }
+
+  /**
+   * Tests that the InterProcessMutex enforces strict Global FIFO ordering.
+   * Clients form a queue when no sessions are available, and are served in 
exact order.
+   */
+  @Test
+  public void testFIFOSessionClaimsFromDifferentRegistries() throws Exception {
+    try (TestingServer server = new TestingServer()) {
+      String connectString = server.getConnectString();
+
+      HiveConf conf = new HiveConf();
+      conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString);
+      conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, 
"/tez_ns_fifo");
+      
conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS, 
15);
+
+      String namespace = HiveConf.getVar(conf, 
ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE);
+      String effectivePath = 
ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace);
+
+      CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder();
+      CuratorFramework client = 
builder.connectString(connectString).retryPolicy(new RetryOneTime(1)).build();
+      client.start();
+
+      ExecutorService executor = Executors.newFixedThreadPool(3);
+      ZookeeperExternalSessionsRegistryClient registry1 = new 
ZookeeperExternalSessionsRegistryClient(conf);
+      ZookeeperExternalSessionsRegistryClient registry2 = new 
ZookeeperExternalSessionsRegistryClient(conf);
+      ZookeeperExternalSessionsRegistryClient registry3 = new 
ZookeeperExternalSessionsRegistryClient(conf);
+      try {
+        Future<String> future1 = executor.submit(registry1::getSession);
+        Thread.sleep(500);

Review Comment:
   this will lead to flaky behaviour and neither guarantees FIFO. Ideally 
should use latches
   
   something like
   ```
   CountDownLatch r1Started = new CountDownLatch(1);
   CountDownLatch r2Started = new CountDownLatch(1);
   
   Future<String> future1 = executor.submit(() -> {
     r1Started.countDown();
     return registry1.getSession();
   });
   
   r1Started.await();
   
   Future<String> future2 = executor.submit(() -> {
     r2Started.countDown();
     return registry2.getSession();
   });
   
   r2Started.await();
   ```



##########
ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java:
##########
@@ -128,5 +134,118 @@ public void testReuseSameSession() throws Exception {
       }
     }
   }
+
+  /**
+   * Tests that multiple registry clients (simulating multiple HS2 instances)
+   * respect the global distributed lock (claims) and do not claim the same 
session simultaneously.
+   */
+  @Test
+  public void testSessionClaimsFromDifferentRegistryClients() throws Exception 
{
+    CuratorFramework client = null;
+    ZookeeperExternalSessionsRegistryClient registry1 = null;
+    ZookeeperExternalSessionsRegistryClient registry2 = null;
+
+    try (TestingServer server = new TestingServer()) {
+      String connectString = server.getConnectString();
+
+      HiveConf conf = new HiveConf();
+      conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString);
+      conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, 
"/tez_ns_concurrent");
+      
conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS, 
5);
+
+      String namespace = HiveConf.getVar(conf, 
ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE);
+      String effectivePath = 
ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace);
+
+      CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder();
+      client = builder.connectString(connectString).retryPolicy(new 
RetryOneTime(1)).build();
+      client.start();
+
+      client.create().creatingParentsIfNeeded().forPath(effectivePath + 
"/app_1");
+      client.create().forPath(effectivePath + "/app_2");
+
+      registry1 = new ZookeeperExternalSessionsRegistryClient(conf);
+      registry2 = new ZookeeperExternalSessionsRegistryClient(conf);
+
+      String sessionFromRegistry1 = registry1.getSession();
+      String sessionFromRegistry2 = registry2.getSession();
+
+      assertNotNull("Registry 1 should have claimed a session", 
sessionFromRegistry1);
+      assertNotNull("Registry 2 should have claimed a session", 
sessionFromRegistry2);
+
+      assertNotEquals("The two registries should claim different sessions!",
+          sessionFromRegistry1, sessionFromRegistry2);
+
+      registry1.returnSession(sessionFromRegistry1);
+
+      String session3FromRegistry2 = registry2.getSession();
+      assertEquals("Registry 2 should be able to claim the newly released 
session",
+          sessionFromRegistry1, session3FromRegistry2);
+
+      registry2.returnSession(sessionFromRegistry2);
+      registry2.returnSession(session3FromRegistry2);
+    } finally {
+      if (registry1 != null) {
+        registry1.close();
+      }
+      if (registry2 != null) {
+        registry2.close();
+      }
+      if (client != null) {
+        client.close();
+      }
+    }
+  }
+
+  /**
+   * Tests that the InterProcessMutex enforces strict Global FIFO ordering.
+   * Clients form a queue when no sessions are available, and are served in 
exact order.
+   */
+  @Test
+  public void testFIFOSessionClaimsFromDifferentRegistries() throws Exception {
+    try (TestingServer server = new TestingServer()) {
+      String connectString = server.getConnectString();
+
+      HiveConf conf = new HiveConf();
+      conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString);
+      conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, 
"/tez_ns_fifo");
+      
conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS, 
15);

Review Comment:
   how do u reach to this magic number 15 here and 5 above? it is by default 
60, why we need to change it?



##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java:
##########
@@ -165,4 +225,34 @@ public void childEvent(final CuratorFramework client, 
final PathChildrenCacheEve
       }
     }
   }
+
+  private final class ClaimsPathListener implements PathChildrenCacheListener {

Review Comment:
   I am thinking do we need this? Was something like this possible
   ```
         CuratorCacheListener claimsListener = 
CuratorCacheListener.builder().forCreates(
             childData -> {
           if (childData == null)
             return;
           String applicationId = getApplicationId(childData);
           synchronized (lock) {
             available.remove(applicationId);
           }
         }).forDeletes(
             childData -> {
           if (childData == null)
             return;
           String applicationId = getApplicationId(childData);
           synchronized (lock) {
             if (!taken.contains(applicationId)) {
               available.add(applicationId);
               lock.notifyAll();
             }
           }
         }).build();
   ```



##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java:
##########
@@ -133,6 +134,11 @@ public void close(boolean keepDagFilesDir) throws 
Exception {
     // We never close external sessions that don't have errors.
     try {
       if (externalAppId != null) {
+        LOG.info("Returning external session with appID: {}", externalAppId);

Review Comment:
   Too Much Information :-), Please change it to debug



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to