mehakmeet commented on a change in pull request #2396:
URL: https://github.com/apache/hadoop/pull/2396#discussion_r514165609



##########
File path: 
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
##########
@@ -3517,33 +3546,86 @@ private FileSystem getInternal(URI uri, Configuration 
conf, Key key)
       if (fs != null) {
         return fs;
       }
-
-      fs = createFileSystem(uri, conf);
-      final long timeout = conf.getTimeDuration(SERVICE_SHUTDOWN_TIMEOUT,
-          SERVICE_SHUTDOWN_TIMEOUT_DEFAULT,
-          ShutdownHookManager.TIME_UNIT_DEFAULT);
-      synchronized (this) { // refetch the lock again
-        FileSystem oldfs = map.get(key);
-        if (oldfs != null) { // a file system is created while lock is 
releasing
-          fs.close(); // close the new file system
-          return oldfs;  // return the old file system
-        }
-
-        // now insert the new file system into the map
-        if (map.isEmpty()
-                && !ShutdownHookManager.get().isShutdownInProgress()) {
-          ShutdownHookManager.get().addShutdownHook(clientFinalizer,
-              SHUTDOWN_HOOK_PRIORITY, timeout,
-              ShutdownHookManager.TIME_UNIT_DEFAULT);
+      // fs not yet created, acquire lock
+      // to construct an instance.
+      try (DurationInfo d =
+              new DurationInfo(LOGGER, false, "Acquiring creator semaphore for 
%s",
+                  uri)) {
+        creatorPermits.acquire();
+      } catch (InterruptedException e) {
+        // acquisition was interrupted; convert to an IOE.
+        throw (IOException)new InterruptedIOException(e.toString())
+            .initCause(e);
+      }
+      FileSystem fsToClose = null;
+      try {
+        // See if FS was instantiated by another thread while waiting
+        // for the permit.
+        synchronized (this) {
+          fs = map.get(key);
         }
-        fs.key = key;
-        map.put(key, fs);
-        if (conf.getBoolean(
-            FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
-          toAutoClose.add(key);
+        if (fs != null) {
+          LOGGER.debug("Filesystem {} created while awaiting semaphore", uri);
+          return fs;
         }
-        return fs;
+        // create the filesystem
+        fs = createFileSystem(uri, conf);
+        final long timeout = conf.getTimeDuration(SERVICE_SHUTDOWN_TIMEOUT,
+            SERVICE_SHUTDOWN_TIMEOUT_DEFAULT,
+            ShutdownHookManager.TIME_UNIT_DEFAULT);
+        // any FS to close outside of the synchronized section
+        synchronized (this) { // lock on the Cache object
+
+          // see if there is now an entry for the FS, which happens
+          // if another thread's creation overlapped with this one.
+          FileSystem oldfs = map.get(key);
+          if (oldfs != null) {
+            // a file system was created in a separate thread.
+            // save the FS reference to close outside all locks,
+            // and switch to returning the oldFS
+            fsToClose = fs;
+            fs = oldfs;
+          } else {
+            // register the clientFinalizer if needed and shutdown isn't
+            // already active
+            if (map.isEmpty()
+                && !ShutdownHookManager.get().isShutdownInProgress()) {
+              ShutdownHookManager.get().addShutdownHook(clientFinalizer,
+                  SHUTDOWN_HOOK_PRIORITY, timeout,
+                  ShutdownHookManager.TIME_UNIT_DEFAULT);
+            }
+            // insert the new file system into the map
+            fs.key = key;
+            map.put(key, fs);
+            if (conf.getBoolean(
+                FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
+              toAutoClose.add(key);
+            }
+          }
+        } // end of synchronized block
+      } finally {
+        // release the creator permit.
+        creatorPermits.release();
+      }
+      if (fsToClose != null) {
+        LOGGER.debug("Duplicate FS created for {}; discarding {}",
+            uri, fs);
+        discardedInstances.incrementAndGet();
+        // close the new file system
+        // note this will briefly remove and reinstate "fsToClose" from
+        // the map. It is done in a synchronized block so will not be
+        // visible to others.
+        fsToClose.close();
       }
+      return fs;
+    }
+
+    /**
+     * Get the count of discarded instances.
+     * @return the new instance.
+     */
+    long getDiscardedInstances() {

Review comment:
       Should we have this as @VisibleForTesting?

##########
File path: 
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java
##########
@@ -336,4 +344,134 @@ public void testCacheIncludesURIUserInfo() throws 
Throwable {
     assertNotEquals(keyA, new FileSystem.Cache.Key(
         new URI("wasb://a:passw...@account.blob.core.windows.net"), conf));
   }
+
+
+  /**
+   * Single semaphore: no surplus FS instances will be created
+   * and then discarded.
+   */
+  @Test
+  public void testCacheSingleSemaphoredConstruction() throws Exception {
+    FileSystem.Cache cache = semaphoredCache(1);
+    createFileSystems(cache, 10);
+    Assertions.assertThat(cache.getDiscardedInstances())
+        .describedAs("Discarded FS instances")
+        .isEqualTo(0);
+  }
+
+  /**
+   * Dual semaphore: thread 2 will get as far as
+   * blocking in the initialize() method while awaiting
+   * thread 1 to complete its initialization.
+   * <p></p>
+   * The thread 2 FS instance will be discarded.
+   * All other threads will block for a cache semaphore,
+   * so when they are given an opportunity to proceed,
+   * they will find that an FS instance exists.
+   */
+  @Test
+  public void testCacheDualSemaphoreConstruction() throws Exception {
+    FileSystem.Cache cache = semaphoredCache(2);
+    createFileSystems(cache, 10);
+    Assertions.assertThat(cache.getDiscardedInstances())
+        .describedAs("Discarded FS instances")
+        .isEqualTo(1);
+  }
+
+  /**
+   * Construct the FS instances in a cache with effectively no
+   * limit on the number of instances which can be created
+   * simultaneously.
+   * <p></p>
+   * This is the effective state before HADOOP-17313.
+   * <p></p>
+   * All but one thread's FS instance will be discarded.
+   */
+  @Test
+  public void testCacheLargeSemaphoreConstruction() throws Exception {
+    FileSystem.Cache cache = semaphoredCache(999);
+    int count = 10;
+    createFileSystems(cache, count);
+    Assertions.assertThat(cache.getDiscardedInstances())
+        .describedAs("Discarded FS instances")
+        .isEqualTo(count -1);
+  }
+
+  /**
+   * Create a cache with a given semaphore size.
+   * @param semaphores number of semaphores
+   * @return the cache.
+   */
+  private FileSystem.Cache semaphoredCache(final int semaphores) {
+    final Configuration conf1 = new Configuration();
+    conf1.setInt(FS_CREATION_PARALLEL_COUNT, semaphores);
+    FileSystem.Cache cache = new FileSystem.Cache(conf1);
+    return cache;
+  }
+
+  /**
+   * Attempt to create {@code count} filesystems in parallel,
+   * then assert that they are all equal.
+   * @param cache cache to use
+   * @param count count of filesystems to instantiate
+   */
+  private void createFileSystems(final FileSystem.Cache cache, final int count)
+      throws URISyntaxException, InterruptedException,
+             java.util.concurrent.ExecutionException {
+    final Configuration conf = new Configuration();
+    conf.set("fs.blocking.impl", BlockingInitializer.NAME);
+    // only one instance can be created at a time.
+    URI uri = new URI("blocking://a");
+    ListeningExecutorService pool =
+        BlockingThreadPoolExecutorService.newInstance(count * 2, 0,
+            10, TimeUnit.SECONDS,
+            "creation-threads");
+
+    // submit a set of requests to create an FS instance.
+    // the semaphore will block all but one, and that will block until
+    // it is allowed to continue
+    List<ListenableFuture<FileSystem>> futures = new ArrayList<>(count);
+
+    // acquire the semaphore so blocking all FS instances from
+    // being initialized.
+    Semaphore semaphore = BlockingInitializer.sem;
+    semaphore.acquire();
+
+    // su
+    for (int i = 0; i < count; i++) {
+      futures.add(pool.submit(
+          () -> cache.get(uri, conf)));
+    }
+    // now let all blocked initializers free
+    semaphore.release();
+    // get that first FS
+    FileSystem createdFS = futures.get(0).get();
+    // verify all the others are the same instance
+    for (int i = 1; i < count; i++) {
+      FileSystem fs = futures.get(i).get();
+      Assertions.assertThat(fs)
+          .isSameAs(createdFS);
+    }
+  }
+
+  /**
+   * An FS which blocks in initialize() until it can aquire the shared

Review comment:
       typo: "aquire" -> "acquire"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to