[ 
https://issues.apache.org/jira/browse/HADOOP-17313?focusedWorklogId=506281&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-506281
 ]

ASF GitHub Bot logged work on HADOOP-17313:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Oct/20 16:48
            Start Date: 29/Oct/20 16:48
    Worklog Time Spent: 10m 
      Work Description: steveloughran commented on a change in pull request 
#2396:
URL: https://github.com/apache/hadoop/pull/2396#discussion_r514411189



##########
File path: 
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java
##########
@@ -336,4 +344,134 @@ public void testCacheIncludesURIUserInfo() throws 
Throwable {
     assertNotEquals(keyA, new FileSystem.Cache.Key(
         new URI("wasb://a:passw...@account.blob.core.windows.net"), conf));
   }
+
+
+  /**
+   * Single semaphore: no surplus FS instances will be created
+   * and then discarded.
+   */
+  @Test
+  public void testCacheSingleSemaphoredConstruction() throws Exception {
+    FileSystem.Cache cache = semaphoredCache(1);
+    createFileSystems(cache, 10);
+    Assertions.assertThat(cache.getDiscardedInstances())
+        .describedAs("Discarded FS instances")
+        .isEqualTo(0);
+  }
+
+  /**
+   * Dual semaphore: thread 2 will get as far as
+   * blocking in the initialize() method while awaiting
+   * thread 1 to complete its initialization.
+   * <p></p>
+   * The thread 2 FS instance will be discarded.
+   * All other threads will block for a cache semaphore,
+   * so when they are given an opportunity to proceed,
+   * they will find that an FS instance exists.
+   */
+  @Test
+  public void testCacheDualSemaphoreConstruction() throws Exception {
+    FileSystem.Cache cache = semaphoredCache(2);
+    createFileSystems(cache, 10);
+    Assertions.assertThat(cache.getDiscardedInstances())
+        .describedAs("Discarded FS instances")
+        .isEqualTo(1);
+  }
+
+  /**
+   * Construct the FS instances in a cache with effectively no
+   * limit on the number of instances which can be created
+   * simultaneously.
+   * <p></p>
+   * This is the effective state before HADOOP-17313.
+   * <p></p>
+   * All but one thread's FS instance will be discarded.
+   */
+  @Test
+  public void testCacheLargeSemaphoreConstruction() throws Exception {
+    FileSystem.Cache cache = semaphoredCache(999);
+    int count = 10;
+    createFileSystems(cache, count);
+    Assertions.assertThat(cache.getDiscardedInstances())
+        .describedAs("Discarded FS instances")
+        .isEqualTo(count -1);
+  }
+
+  /**
+   * Create a cache with a given semaphore size.
+   * @param semaphores number of semaphores
+   * @return the cache.
+   */
+  private FileSystem.Cache semaphoredCache(final int semaphores) {
+    final Configuration conf1 = new Configuration();
+    conf1.setInt(FS_CREATION_PARALLEL_COUNT, semaphores);
+    FileSystem.Cache cache = new FileSystem.Cache(conf1);
+    return cache;
+  }
+
+  /**
+   * Attempt to create {@code count} filesystems in parallel,
+   * then assert that they are all equal.
+   * @param cache cache to use
+   * @param count count of filesystems to instantiate
+   */
+  private void createFileSystems(final FileSystem.Cache cache, final int count)
+      throws URISyntaxException, InterruptedException,
+             java.util.concurrent.ExecutionException {
+    final Configuration conf = new Configuration();
+    conf.set("fs.blocking.impl", BlockingInitializer.NAME);
+    // only one instance can be created at a time.
+    URI uri = new URI("blocking://a");
+    ListeningExecutorService pool =
+        BlockingThreadPoolExecutorService.newInstance(count * 2, 0,
+            10, TimeUnit.SECONDS,
+            "creation-threads");
+
+    // submit a set of requests to create an FS instance.
+    // the semaphore will block all but one, and that will block until
+    // it is allowed to continue
+    List<ListenableFuture<FileSystem>> futures = new ArrayList<>(count);
+
+    // acquire the semaphore so blocking all FS instances from
+    // being initialized.
+    Semaphore semaphore = BlockingInitializer.sem;
+    semaphore.acquire();
+
+    // su

Review comment:
       cut




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 506281)
    Time Spent: 2h  (was: 1h 50m)

> FileSystem.get to support slow-to-instantiate FS clients
> --------------------------------------------------------
>
>                 Key: HADOOP-17313
>                 URL: https://issues.apache.org/jira/browse/HADOOP-17313
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs, fs/azure, fs/s3
>    Affects Versions: 3.3.0
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> A recurrent problem in processes with many worker threads (hive, spark etc) 
> is that calling `FileSystem.get(URI-to-object-store)` triggers the creation 
> and then discard of many FS clients -all but one for the same URL. As well as 
> the direct performance hit, this can exacerbate locking problems and make 
> instantiation a lot slower than it would otherwise be.
> This has been observed with the S3A and ABFS connectors.
> The ultimate solution here would probably be something more complicated to 
> ensure that only one thread was ever creating a connector for a given URL 
> -the rest would wait for it to be initialized. This would (a) reduce 
> contention & CPU, IO network load, and (b) reduce the time for all but the 
> first thread to resume processing to that of the remaining time in 
> .initialize(). This would also benefit the S3A connector.
> We'd need something like
> # A (per-user) map of filesystems being created <URI, FileSystem>
> # split createFileSystem into two: instantiateFileSystem and 
> initializeFileSystem
> # each thread to instantiate the FS, put() it into the new map
> # If there was one already, discard the old one and wait for the new one to 
> be ready via a call to Object.wait()
> # If there wasn't an entry, call initializeFileSystem) and then, finally, 
> call Object.notifyAll(), and move it from the map of filesystems being 
> initialized to the map of created filesystems
> This sounds too straightforward to be that simple; the troublespots are 
> probably related to race conditions moving entries between the two maps and 
> making sure that no thread will block on the FS being initialized while it 
> has already been initialized (and so wait() will block forever).
> Rather than seek perfection, it may be safest go for a best-effort 
> optimisation of the #of FS instances created/initialized. That is: its better 
> to maybe create a few more FS instances than needed than it is to block 
> forever.
> Something is doable here, it's just not quick-and-dirty. Testing will be 
> "fun"; probably best to isolate this new logic somewhere where we can 
> simulate slow starts on one thread with many other threads waiting for it.
> A simpler option would be to have a lock on the construction process: only 
> one FS can be instantiated per user at a a time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to