This is an automated email from the ASF dual-hosted git repository.

sollhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e0dad7a78a [fix](broker) Fix broker OOM caused by file system netty 
thread pool leak (issue #64138) (#64224)
4e0dad7a78a is described below

commit 4e0dad7a78ad053b3563d1e0f574e3c31eb2d93b
Author: LompleZ Liu <[email protected]>
AuthorDate: Thu Jun 11 19:47:21 2026 +0800

    [fix](broker) Fix broker OOM caused by file system netty thread pool leak 
(issue #64138) (#64224)
    
    Refactoring the code logic of the broker file system cache:
    1. Use the compute method of concurrent hash map to ensure the atomicity
    of fs file system cache and to ensure the ideal singleton running mode
    for a single file system as much as possible
    2. Ensure that fs is eliminated from the cache, but the fs being read
    will not be shut down abnormally:
    2.1 BrowserFileSystem adds input/output stream reference counting
    function, operation counting increases references
    2.2 Add a fs recycle bin mechanism where fs is not directly removed,but
    is first placed in the recycle bin
        2.3 Add Recycle Bin Cleanup Thread
    
    Previously, when Thread A encountered a storage access/operation failure
    and consequently evicted the cached FileSystem (FS), forcefully closing
    the file system at that time would cause Thread B, which was currently
    using this file system for reading/writing data, to fail. Now, we put
    the evicted file system into a recycle bin, and it will be safely closed
    only when its reference count reaches 0.
    Even if input/output streams encounter unexpected situations and are not
    closed for a long time, causing the reference count to never reach 0, we
    can still rely on the timeout detection for input/output streams in
    CheckClientExpirationTask (client_expire_seconds) as a safety measure to
    forcefully close the input/output streams, ensuring that the file
    systems in the recycle bin can eventually be closed.
    
    close #64138
    
    <img width="10714" height="8736" alt="导出图(包含画布数据) excalidraw"
    
src="https://github.com/user-attachments/assets/fefef9ac-8fa8-4e2a-a2e3-ce7166c38b5b";
    />
    
    Co-authored-by: liuhaozhe03 <[email protected]>
---
 .../org/apache/doris/broker/hdfs/BrokerConfig.java |   9 +-
 .../apache/doris/broker/hdfs/BrokerFileSystem.java |  48 +++-
 .../doris/broker/hdfs/ClientContextManager.java    |  88 ++----
 .../doris/broker/hdfs/FileSystemManager.java       | 313 ++++++++++++---------
 4 files changed, 249 insertions(+), 209 deletions(-)

diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
index 5fd17706a1c..bc149732cf3 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
@@ -29,14 +29,11 @@ public class BrokerConfig extends ConfigBase {
     public static int hdfs_write_buffer_size_kb = 1024;
 
     @ConfField
+    // Do not set this value too small.
+    // Otherwise, a file system still being read/written by some thread in the 
fileSystemRecycleBin
+    // may be abnormally closed due to the small value, causing BE execution 
failure.
     public static int client_expire_seconds = 3600;
 
-    @ConfField
-    public static boolean enable_input_stream_expire_check = false;
-
-    @ConfField
-    public static int input_stream_expire_seconds = 300;
-
     @ConfField
     public static int broker_ipc_port = 8000;
 }
diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java
index 6c32f32f9ac..3d3232dc120 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java
@@ -17,7 +17,9 @@
 
 package org.apache.doris.broker.hdfs;
 
+import java.io.IOException;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -25,14 +27,15 @@ import org.apache.log4j.Logger;
 
 public class BrokerFileSystem {
 
-    private static Logger logger = Logger
-            .getLogger(BrokerFileSystem.class.getName());
+    private static Logger logger = 
Logger.getLogger(BrokerFileSystem.class.getName());
 
     private ReentrantLock lock;
     private FileSystemIdentity identity;
-    private FileSystem dfsFileSystem;
+    private volatile FileSystem dfsFileSystem;
     private volatile long lastAccessTimestamp;
     private UUID fileSystemId;
+    private final AtomicInteger activeOperationCount = new AtomicInteger(0);
+    private final AtomicInteger activeStreamCount = new AtomicInteger(0);
 
     public BrokerFileSystem(FileSystemIdentity identity) {
         this.identity = identity;
@@ -42,21 +45,24 @@ public class BrokerFileSystem {
         this.fileSystemId = UUID.randomUUID();
     }
 
-    public synchronized void setFileSystem(FileSystem fileSystem) {
-        this.dfsFileSystem = fileSystem;
-        this.lastAccessTimestamp = System.currentTimeMillis();
+    public void setFileSystem(FileSystem fileSystem) {
+        lock.lock();
+        try {
+            this.dfsFileSystem = fileSystem;
+            this.lastAccessTimestamp = System.currentTimeMillis();
+        } finally {
+            lock.unlock();
+        }
     }
 
-    public void closeFileSystem() {
+    public void closeFileSystem() throws IOException {
         lock.lock();
         try {
             if (this.dfsFileSystem != null) {
                 try {
-                    // do not close file system, it will be closed 
automatically.
-                    // this.dfsFileSystem.close();
-                } catch (Exception e) {
-                    logger.error("errors while close file system", e);
+                    this.dfsFileSystem.close();
                 } finally {
+                    // Even if it fails, do not call close again, just set 
null.
                     this.dfsFileSystem = null;
                 }
             }
@@ -82,6 +88,26 @@ public class BrokerFileSystem {
         return lock;
     }
 
+    public UUID getFileSystemId() {
+        return fileSystemId;
+    }
+
+    public long getLastAccessTimestamp() {
+        return lastAccessTimestamp;
+    }
+
+    // now we only call incrementActiveOperations in updateCachedFileSystem.
+    // concurrentHashMap.compute() ensures atomicity,
+    // and all "brokerFileSystem object" that call this method must be in 
"FileSystemManager.cachedFileSystem",
+    // so there is no need to lock it
+    public void incrementActiveOperations() { 
activeOperationCount.incrementAndGet(); }
+    public void decrementActiveOperations() { 
activeOperationCount.decrementAndGet(); }
+    public int  getActiveOperationsCount() { return 
activeOperationCount.get(); }
+
+    public void incrementActiveStreams() { 
activeStreamCount.incrementAndGet(); }
+    public void decrementActiveStreams() { 
activeStreamCount.decrementAndGet(); }
+    public int  getActiveStreamCount() { return activeStreamCount.get(); }
+
     public boolean isExpiredByLastAccessTime() {
         return System.currentTimeMillis() - lastAccessTimestamp > 
BrokerConfig.client_expire_seconds * 1000L;
     }
diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
index 8e2fcfe60de..9492cf024fc 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
@@ -22,7 +22,9 @@ import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -34,10 +36,14 @@ import org.apache.doris.thrift.TBrokerOperationStatusCode;
 
 public class ClientContextManager {
 
-    private static Logger logger = Logger
-        .getLogger(ClientContextManager.class.getName());
-    private ScheduledExecutorService clientCheckExecutorService;
-    private ScheduledExecutorService inputStreamCheckExecuterService;
+    private static Logger logger = 
Logger.getLogger(ClientContextManager.class.getName());
+    private ScheduledExecutorService clientContextCheckExecutorService = 
Executors.newScheduledThreadPool(2,
+        new ThreadFactory() {
+            private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+            @Override
+            public Thread newThread(Runnable r) { return new Thread(r, 
"clientContextCleanup-pool-" + threadNumber.getAndIncrement()); }
+        });
     private ConcurrentHashMap<String, ClientResourceContext> clientContexts;
     private ConcurrentHashMap<TBrokerFD, String> fdToClientMap;
     private int clientExpirationSeconds = BrokerConfig.client_expire_seconds;
@@ -45,12 +51,7 @@ public class ClientContextManager {
     public ClientContextManager() {
         clientContexts = new ConcurrentHashMap<>();
         fdToClientMap = new ConcurrentHashMap<>();
-        this.clientCheckExecutorService = Executors.newScheduledThreadPool(2);
-        this.clientCheckExecutorService.schedule(new 
CheckClientExpirationTask(), 0, TimeUnit.SECONDS);
-        if (BrokerConfig.enable_input_stream_expire_check) {
-            this.inputStreamCheckExecuterService = 
Executors.newScheduledThreadPool(2);
-            this.inputStreamCheckExecuterService.schedule(new 
CheckInputStreamExpirationTask(), 0, TimeUnit.SECONDS);
-        }
+        this.clientContextCheckExecutorService.schedule(new 
CheckClientExpirationTask(), 0, TimeUnit.SECONDS);
     }
 
     public void onPing(String clientId) {
@@ -110,12 +111,14 @@ public class ClientContextManager {
         }
         ClientResourceContext clientContext = clientContexts.get(clientId);
         BrokerInputStream brokerInputStream = 
clientContext.inputStreams.remove(fd);
-        try {
-            if (brokerInputStream != null) {
+        if (brokerInputStream != null) {
+            try {
                 brokerInputStream.inputStream.close();
+            } catch (Exception e) {
+                logger.error("errors while close file data input stream", e);
+            } finally {
+                brokerInputStream.brokerFileSystem.decrementActiveStreams();
             }
-        } catch (Exception e) {
-            logger.error("errors while close file data input stream", e);
         }
     }
 
@@ -126,30 +129,13 @@ public class ClientContextManager {
         }
         ClientResourceContext clientContext = clientContexts.get(clientId);
         BrokerOutputStream brokerOutputStream = 
clientContext.outputStreams.remove(fd);
-        try {
-            if (brokerOutputStream != null) {
+        if (brokerOutputStream != null) {
+            try {
                 brokerOutputStream.outputStream.close();
-            }
-        } catch (Exception e) {
-            logger.error("errors while close file data output stream", e);
-        }
-    }
-
-    public synchronized void remoteExpireInputStreams() {
-        int inputStreamExpireSeconds = 
BrokerConfig.input_stream_expire_seconds;
-        TBrokerFD fd;
-        for (ClientResourceContext clientContext : clientContexts.values()) {
-            Iterator<Entry<TBrokerFD, BrokerInputStream>> iter = 
clientContext.inputStreams.entrySet().iterator();
-            while (iter.hasNext()) {
-                Entry<TBrokerFD, BrokerInputStream> entry = iter.next();
-                fd = entry.getKey();
-                if (entry.getValue().checkExpire(inputStreamExpireSeconds)) {
-                    ClientContextManager.this.removeInputStream(fd);
-                    iter.remove();
-                    logger.info(fd + " in client [" + clientContext.clientId
-                            + "] is expired, remove it from contexts. last 
update time is "
-                            + entry.getValue().getLastPingTimestamp());
-                }
+            } catch (Exception e) {
+                logger.error("errors while close file data output stream", e);
+            } finally {
+                brokerOutputStream.brokerFileSystem.decrementActiveStreams();
             }
         }
     }
@@ -159,7 +145,7 @@ public class ClientContextManager {
         public void run() {
             try {
                 for (ClientResourceContext clientContext : 
clientContexts.values()) {
-                    if (System.currentTimeMillis() - 
clientContext.lastAccessTimestamp > clientExpirationSeconds * 1000) {
+                    if (System.currentTimeMillis() - 
clientContext.lastAccessTimestamp > clientExpirationSeconds * 1000L) {
                         for (TBrokerFD fd : 
clientContext.inputStreams.keySet()) {
                             ClientContextManager.this.removeInputStream(fd);
                         }
@@ -173,18 +159,7 @@ public class ClientContextManager {
                     }
                 }
             } finally {
-                
ClientContextManager.this.clientCheckExecutorService.schedule(this, 60, 
TimeUnit.SECONDS);
-            }
-        }
-    }
-
-    class CheckInputStreamExpirationTask implements Runnable {
-        @Override
-        public void run() {
-            try {
-                ClientContextManager.this.remoteExpireInputStreams();
-            } finally {
-                
ClientContextManager.this.inputStreamCheckExecuterService.schedule(this, 60, 
TimeUnit.SECONDS);
+                
ClientContextManager.this.clientContextCheckExecutorService.schedule(this, 60, 
TimeUnit.SECONDS);
             }
         }
     }
@@ -198,6 +173,7 @@ public class ClientContextManager {
             this.outputStream = outputStream;
             this.brokerFileSystem = brokerFileSystem;
             this.brokerFileSystem.updateLastUpdateAccessTime();
+            this.brokerFileSystem.incrementActiveStreams();
         }
 
         public FSDataOutputStream getOutputStream() {
@@ -214,32 +190,22 @@ public class ClientContextManager {
 
         private final FSDataInputStream inputStream;
         private final BrokerFileSystem brokerFileSystem;
-        private AtomicLong lastPingTimestamp;
 
         public BrokerInputStream(FSDataInputStream inputStream, 
BrokerFileSystem brokerFileSystem) {
             this.inputStream = inputStream;
             this.brokerFileSystem = brokerFileSystem;
             this.brokerFileSystem.updateLastUpdateAccessTime();
-            this.lastPingTimestamp = new 
AtomicLong(System.currentTimeMillis());
+            this.brokerFileSystem.incrementActiveStreams();
         }
 
         public FSDataInputStream getInputStream() {
             this.brokerFileSystem.updateLastUpdateAccessTime();
-            this.lastPingTimestamp.set(System.currentTimeMillis());
             return inputStream;
         }
 
         public void updateLastUpdateAccessTime() {
             this.brokerFileSystem.updateLastUpdateAccessTime();
         }
-
-        public boolean checkExpire(long expireSecond) {
-            return System.currentTimeMillis() - lastPingTimestamp.get() > 
expireSecond * 1000;
-        }
-
-        public long getLastPingTimestamp() {
-            return lastPingTimestamp.get();
-        }
     }
 
     static class ClientResourceContext {
diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 364035d6cdb..7a2d522d243 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -45,7 +45,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileLock;
@@ -60,11 +59,16 @@ import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class FileSystemManager {
 
-    private static Logger logger = Logger
-            .getLogger(FileSystemManager.class.getName());
+    private static Logger logger = 
Logger.getLogger(FileSystemManager.class.getName());
     // supported scheme
     private static final String HDFS_SCHEME = "hdfs";
     private static final String VIEWFS_SCHEME = "viewfs";
@@ -80,9 +84,9 @@ public class FileSystemManager {
     private static final String GFS_SCHEME = "gfs";
 
     private static final String GCS_SCHEME = "gs";
-    
+
     private static final String FS_PREFIX = "fs.";
-    
+
     private static final String GCS_PROJECT_ID_KEY = "fs.gs.project.id";
 
     private static final String USER_NAME_KEY = "username";
@@ -167,11 +171,25 @@ public class FileSystemManager {
     private int writeBufferSize = 128 << 10; // 128k
 
     private ConcurrentHashMap<FileSystemIdentity, BrokerFileSystem> 
cachedFileSystem;
+    // Used to store expired file systems that have been phased out from the 
`cachedFileSystem` above,
+    // but there may be inputstreams and outputstreams that are transmitting 
data that depend on these file systems.
+    // Therefore, we use background threads to traverse this queue to 
determine whether it can be safely deleted.
+    private ConcurrentLinkedQueue<BrokerFileSystem> fileSystemRecycleBin;
     private ClientContextManager clientContextManager;
 
+    private ScheduledExecutorService fileSystemRecycleBinCleanupPool = 
Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactory() {
+            private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+            @Override
+            public Thread newThread(Runnable r) { return new Thread(r, 
"fileSystemCleanup-pool-" + threadNumber.getAndIncrement()); }
+        });
+
     public FileSystemManager() {
         cachedFileSystem = new ConcurrentHashMap<>();
+        fileSystemRecycleBin = new ConcurrentLinkedQueue<>();
         clientContextManager = new ClientContextManager();
+        fileSystemRecycleBinCleanupPool.scheduleWithFixedDelay(new 
CheckBrokerFileSystemExpirationTask(), 0, 3, TimeUnit.SECONDS);
         readBufferSize = BrokerConfig.hdfs_read_buffer_size_kb << 10;
         writeBufferSize = BrokerConfig.hdfs_write_buffer_size_kb << 10;
     }
@@ -198,13 +216,12 @@ public class FileSystemManager {
     /**
      * visible for test
      *
-     * @param path
-     * @param properties
+     * @param path the file path to be parsed (e.g., hdfs://...)
+     * @param properties the configuration properties for the file system
      * @return BrokerFileSystem with different FileSystem based on scheme
-     * @throws URISyntaxException
-     * @throws Exception
+     * @throws BrokerException if the input path is invalid or the scheme is 
not supported
      */
-    public BrokerFileSystem getFileSystem(String path, Map<String, String> 
properties) {
+    public BrokerFileSystem getFileSystem(String path, Map<String, String> 
properties) throws BrokerException {
         WildcardURI pathUri = new WildcardURI(path);
         String scheme = pathUri.getUri().getScheme();
         if (Strings.isNullOrEmpty(scheme)) {
@@ -249,11 +266,6 @@ public class FileSystemManager {
      * file system handle is cached, the identity is host + username_password
      * it will have safety problem if only hostname is used because one user 
may specify username and password
      * and then access hdfs, another user may not specify username and 
password but could also access data
-     * @param path
-     * @param properties
-     * @return
-     * @throws URISyntaxException
-     * @throws Exception
      */
     public BrokerFileSystem getDistributedFileSystem(String path, Map<String, 
String> properties) {
         WildcardURI pathUri = new WildcardURI(path);
@@ -438,6 +450,7 @@ public class FileSystemManager {
             }
             return fileSystem;
         } catch (Exception e) {
+            fileSystem.decrementActiveOperations();
             logger.error("errors while connect to " + path, e);
             throw new 
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
         } finally {
@@ -447,13 +460,7 @@ public class FileSystemManager {
 
     /**
      * visible for test
-     *
      * file system handle is cached, the identity is host + accessKey_secretKey
-     * @param path
-     * @param properties
-     * @return
-     * @throws URISyntaxException
-     * @throws Exception
      */
     public BrokerFileSystem getS3AFileSystem(String path, Map<String, String> 
properties) {
         WildcardURI pathUri = new WildcardURI(path);
@@ -480,6 +487,7 @@ public class FileSystemManager {
             }
             return fileSystem;
         } catch (Exception e) {
+            fileSystem.decrementActiveOperations();
             logger.error("errors while connect to " + path, e);
             throw new 
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
         } finally {
@@ -519,6 +527,7 @@ public class FileSystemManager {
             }
             return fileSystem;
         } catch (Exception e) {
+            fileSystem.decrementActiveOperations();
             logger.error("errors while connect to " + path, e);
             throw new 
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
         } finally {
@@ -528,9 +537,6 @@ public class FileSystemManager {
 
     /**
      * file system handle is cached, the identity is endpoint + bucket + 
accessKey_secretKey
-     * @param path
-     * @param properties
-     * @return
      */
     public BrokerFileSystem getOBSFileSystem(String path, Map<String, String> 
properties) {
         WildcardURI pathUri = new WildcardURI(path);
@@ -541,7 +547,6 @@ public class FileSystemManager {
         String host = OBS_SCHEME + "://" + endpoint + "/" + 
pathUri.getUri().getHost();
         String obsUgi = accessKey + "," + secretKey;
         FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, 
obsUgi);
-        cachedFileSystem.putIfAbsent(fileSystemIdentity, new 
BrokerFileSystem(fileSystemIdentity));
         BrokerFileSystem fileSystem = 
updateCachedFileSystem(fileSystemIdentity, properties);
         fileSystem.getLock().lock();
         try {
@@ -559,6 +564,7 @@ public class FileSystemManager {
             }
             return fileSystem;
         } catch (Exception e) {
+            fileSystem.decrementActiveOperations();
             logger.error("errors while connect to " + path, e);
             throw new 
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
         } finally {
@@ -568,14 +574,7 @@ public class FileSystemManager {
 
     /**
      * visible for test
-     * <p>
      * file system handle is cached, the identity is endpoint + bucket + 
accessKey_secretKey
-     *
-     * @param path
-     * @param properties
-     * @return
-     * @throws URISyntaxException
-     * @throws Exception
      */
     public BrokerFileSystem getKS3FileSystem(String path, Map<String, String> 
properties) {
         WildcardURI pathUri = new WildcardURI(path);
@@ -605,6 +604,7 @@ public class FileSystemManager {
             }
             return fileSystem;
         } catch (Exception e) {
+            fileSystem.decrementActiveOperations();
             logger.error("errors while connect to " + path, e);
             throw new 
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
         } finally {
@@ -612,11 +612,8 @@ public class FileSystemManager {
         }
     }
 
-   /**
+    /**
      * file system handle is cached, the identity is endpoint + bucket + 
accessKey_secretKey
-     * @param path
-     * @param properties
-     * @return
      */
     public BrokerFileSystem getOSSFileSystem(String path, Map<String, String> 
properties) {
         WildcardURI pathUri = new WildcardURI(path);
@@ -627,7 +624,6 @@ public class FileSystemManager {
         String host = OSS_SCHEME + "://" + endpoint + "/" + 
pathUri.getUri().getHost();
         String ossUgi = accessKey + "," + secretKey;
         FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, 
ossUgi);
-        cachedFileSystem.putIfAbsent(fileSystemIdentity, new 
BrokerFileSystem(fileSystemIdentity));
         BrokerFileSystem fileSystem = 
updateCachedFileSystem(fileSystemIdentity, properties);
         fileSystem.getLock().lock();
         try {
@@ -645,6 +641,7 @@ public class FileSystemManager {
             }
             return fileSystem;
         } catch (Exception e) {
+            fileSystem.decrementActiveOperations();
             logger.error("errors while connect to " + path, e);
             throw new 
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
         } finally {
@@ -654,13 +651,7 @@ public class FileSystemManager {
 
     /**
      * visible for test
-     *
      * file system handle is cached, the identity is for all chdfs.
-     * @param path
-     * @param properties
-     * @return
-     * @throws URISyntaxException
-     * @throws Exception
      */
     public BrokerFileSystem getChdfsFileSystem(String path, Map<String, 
String> properties) {
         WildcardURI pathUri = new WildcardURI(path);
@@ -761,6 +752,7 @@ public class FileSystemManager {
             }
             return fileSystem;
         } catch (Exception e) {
+            fileSystem.decrementActiveOperations();
             logger.error("errors while connect to " + path, e);
             throw new 
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
         } finally {
@@ -770,14 +762,7 @@ public class FileSystemManager {
 
     /**
      * visible for test
-     * <p>
      * file system handle is cached, the identity is endpoint + bucket + 
accessKey_secretKey
-     *
-     * @param path
-     * @param properties
-     * @return
-     * @throws URISyntaxException
-     * @throws Exception
      */
     public BrokerFileSystem getCOSFileSystem(String path, Map<String, String> 
properties) {
         WildcardURI pathUri = new WildcardURI(path);
@@ -807,6 +792,7 @@ public class FileSystemManager {
             }
             return fileSystem;
         } catch (Exception e) {
+            fileSystem.decrementActiveOperations();
             logger.error("errors while connect to " + path, e);
             throw new 
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
         } finally {
@@ -816,14 +802,7 @@ public class FileSystemManager {
 
     /**
      * visible for test
-     * <p>
      * file system handle is cached, the identity is endpoint + bucket + 
accessKey_secretKey
-     *
-     * @param path
-     * @param properties
-     * @return
-     * @throws URISyntaxException
-     * @throws Exception
      */
     public BrokerFileSystem getBOSFileSystem(String path, Map<String, String> 
properties) {
         WildcardURI pathUri = new WildcardURI(path);
@@ -854,6 +833,7 @@ public class FileSystemManager {
             }
             return fileSystem;
         } catch (Exception e) {
+            fileSystem.decrementActiveOperations();
             logger.error("errors while connect to " + path, e);
             throw new 
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
         } finally {
@@ -863,13 +843,7 @@ public class FileSystemManager {
 
     /**
      * visible for test
-     *
-     * file system handle is cached, the identity is for all juicefs.
-     * @param path
-     * @param properties
-     * @return
-     * @throws URISyntaxException
-     * @throws Exception
+     * file system handle is cached, the identity is for all juiceFs.
      */
     public BrokerFileSystem getJuiceFileSystem(String path, Map<String, 
String> properties) {
         WildcardURI pathUri = new WildcardURI(path);
@@ -980,6 +954,7 @@ public class FileSystemManager {
             }
             return fileSystem;
         } catch (Exception e) {
+            fileSystem.decrementActiveOperations();
             logger.error("errors while connect to " + path, e);
             throw new 
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
         } finally {
@@ -995,15 +970,9 @@ public class FileSystemManager {
         String group = properties.containsKey(HADOOP_JOB_GROUP_NAME) ? 
properties.get(HADOOP_JOB_GROUP_NAME) : "";
         String afsUgi = username + "," + password;
         FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, 
afsUgi, group);
-        cachedFileSystem.putIfAbsent(fileSystemIdentity, new 
BrokerFileSystem(fileSystemIdentity));
         BrokerFileSystem fileSystem = 
updateCachedFileSystem(fileSystemIdentity, properties);
         fileSystem.getLock().lock();
         try {
-            if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
-                // this means the file system is closed by file system checker 
thread
-                // it is a corner case
-                return null;
-            }
             if (fileSystem.getDFSFileSystem() == null) {
                 logger.info("could not find file system for path " + path + " 
create a new one");
                 // create a new file system
@@ -1021,6 +990,7 @@ public class FileSystemManager {
             }
             return fileSystem;
         } catch (Exception e) {
+            fileSystem.decrementActiveOperations();
             logger.error("errors while connect to " + path, e);
             throw new 
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e, e.getMessage());
         } finally {
@@ -1028,13 +998,6 @@ public class FileSystemManager {
         }
     }
 
-    /**
-     * @param path
-     * @param properties
-     * @return
-     * @throws URISyntaxException
-     * @throws Exception
-     */
     public BrokerFileSystem getGooseFSFileSystem(String path, Map<String, 
String> properties) {
         WildcardURI pathUri = new WildcardURI(path);
         // endpoint is the server host, pathUri.getUri().getHost() is the 
bucket
@@ -1045,33 +1008,35 @@ public class FileSystemManager {
         String password = properties.getOrDefault(PASSWORD_KEY, "");
         String gfsUgi = username + "," + password;
         FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, 
gfsUgi);
-        BrokerFileSystem brokerFileSystem = 
updateCachedFileSystem(fileSystemIdentity, properties);
-        brokerFileSystem.getLock().lock();
+        BrokerFileSystem fileSystem = 
updateCachedFileSystem(fileSystemIdentity, properties);
+        fileSystem.getLock().lock();
         try {
-            if (brokerFileSystem.getDFSFileSystem() == null) {
-                logger.info("create goosefs client: " + path);
+            if (fileSystem.getDFSFileSystem() == null) {
+                logger.info("create gooseFs client: " + path);
                 Configuration conf = new Configuration();
                 for (Map.Entry<String, String> propElement : 
properties.entrySet()) {
                     conf.set(propElement.getKey(), propElement.getValue());
                 }
-                FileSystem fileSystem = FileSystem.get(pathUri.getUri(), conf);
-                brokerFileSystem.setFileSystem(fileSystem);
+                FileSystem gooseFileSystem = FileSystem.get(pathUri.getUri(), 
conf);
+                fileSystem.setFileSystem(gooseFileSystem);
             }
-            return brokerFileSystem;
+            return fileSystem;
         } catch (Exception e) {
+            fileSystem.decrementActiveOperations();
             logger.error("errors while connect to " + path, e);
             throw new 
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
         } finally {
-            brokerFileSystem.getLock().unlock();
+            fileSystem.getLock().unlock();
         }
     }
 
     public List<TBrokerFileStatus> listLocatedFiles(String path, boolean 
onlyFiles,
                                                     boolean recursive, 
Map<String, String> properties) {
         List<TBrokerFileStatus> resultFileStatus = null;
-        BrokerFileSystem fileSystem = getFileSystem(path, properties);
+        BrokerFileSystem fileSystem = null;
         Path locatedPath = new Path(path);
         try {
+            fileSystem = getFileSystem(path, properties);
             FileSystem innerFileSystem = fileSystem.getDFSFileSystem();
             RemoteIterator<LocatedFileStatus> locatedFiles = onlyFiles ? 
innerFileSystem.listFiles(locatedPath, recursive)
                 : innerFileSystem.listLocatedStatus(locatedPath);
@@ -1082,9 +1047,11 @@ public class FileSystemManager {
                 e, "file not found");
         } catch (Exception e) {
             logger.error("errors while get file status ", e);
-            fileSystem.closeFileSystem();
+            if (fileSystem != null) 
moveBrokerFileSystemToRecycleBin(fileSystem);
             throw new 
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
                 e, "unknown error when listLocatedFiles");
+        } finally {
+            if (fileSystem != null) fileSystem.decrementActiveOperations();
         }
     }
 
@@ -1112,9 +1079,10 @@ public class FileSystemManager {
     public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, 
Map<String, String> properties) {
         List<TBrokerFileStatus> resultFileStatus = null;
         WildcardURI pathUri = new WildcardURI(path);
-        BrokerFileSystem fileSystem = getFileSystem(path, properties);
         Path pathPattern = new Path(pathUri.getPath());
+        BrokerFileSystem fileSystem = null;
         try {
+            fileSystem = getFileSystem(path, properties);
             FileStatus[] files = 
fileSystem.getDFSFileSystem().globStatus(pathPattern);
             if (files == null) {
                 resultFileStatus = new ArrayList<>(0);
@@ -1147,24 +1115,29 @@ public class FileSystemManager {
                     e, "file not found");
         } catch (Exception e) {
             logger.error("errors while get file status ", e);
-            fileSystem.closeFileSystem();
+            if (fileSystem != null) 
moveBrokerFileSystemToRecycleBin(fileSystem);
             throw new 
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
                     e, "unknown error when get file status");
+        } finally {
+            if (fileSystem != null) fileSystem.decrementActiveOperations();
         }
         return resultFileStatus;
     }
 
     public void deletePath(String path, Map<String, String> properties) {
         WildcardURI pathUri = new WildcardURI(path);
-        BrokerFileSystem fileSystem = getFileSystem(path, properties);
+        BrokerFileSystem fileSystem = null;
         Path filePath = new Path(pathUri.getPath());
         try {
+            fileSystem = getFileSystem(path, properties);
             fileSystem.getDFSFileSystem().delete(filePath, true);
-        } catch (IOException e) {
+        } catch (Exception e) {
             logger.error("errors while delete path " + path);
-            fileSystem.closeFileSystem();
+            if (fileSystem != null) 
moveBrokerFileSystemToRecycleBin(fileSystem);
             throw new 
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
                     e, "delete path {} error", path);
+        } finally {
+            if (fileSystem != null) fileSystem.decrementActiveOperations();
         }
     }
 
@@ -1175,54 +1148,63 @@ public class FileSystemManager {
             throw new 
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
                     "only allow rename in same file system");
         }
-        BrokerFileSystem fileSystem = getFileSystem(srcPath, properties);
+        BrokerFileSystem fileSystem = null;
         Path srcfilePath = new Path(srcPathUri.getPath());
         Path destfilePath = new Path(destPathUri.getPath());
         try {
+            fileSystem = getFileSystem(srcPath, properties);
             boolean isRenameSuccess = 
fileSystem.getDFSFileSystem().rename(srcfilePath, destfilePath);
             if (!isRenameSuccess) {
                 throw new 
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
                         "failed to rename path from {} to {}", srcPath, 
destPath);
             }
-        } catch (IOException e) {
+        } catch (Exception e) {
             logger.error("errors while rename path from " + srcPath + " to " + 
destPath);
-            fileSystem.closeFileSystem();
+            if (fileSystem != null) 
moveBrokerFileSystemToRecycleBin(fileSystem);
             throw new 
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
                     e, "errors while rename {} to {}", srcPath, destPath);
+        } finally {
+            if (fileSystem != null) fileSystem.decrementActiveOperations();
         }
     }
 
     public boolean checkPathExist(String path, Map<String, String> properties) 
{
         WildcardURI pathUri = new WildcardURI(path);
-        BrokerFileSystem fileSystem = getFileSystem(path, properties);
+        BrokerFileSystem fileSystem = null;
         Path filePath = new Path(pathUri.getPath());
         try {
+            fileSystem = getFileSystem(path, properties);
             boolean isPathExist = 
fileSystem.getDFSFileSystem().exists(filePath);
             return isPathExist;
-        } catch (IOException e) {
+        } catch (Exception e) {
             logger.error("errors while check path exist: " + path);
-            fileSystem.closeFileSystem();
+            if (fileSystem != null) 
moveBrokerFileSystemToRecycleBin(fileSystem);
             throw new 
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
                     e, "errors while check if path {} exist", path);
+        } finally {
+            if (fileSystem != null) fileSystem.decrementActiveOperations();
         }
     }
 
     public TBrokerFD openReader(String clientId, String path, long 
startOffset, Map<String, String> properties) {
         WildcardURI pathUri = new WildcardURI(path);
         Path inputFilePath = new Path(pathUri.getPath());
-        BrokerFileSystem fileSystem = getFileSystem(path, properties);
+        BrokerFileSystem fileSystem = null;
         try {
+            fileSystem = getFileSystem(path, properties);
             FSDataInputStream fsDataInputStream = 
fileSystem.getDFSFileSystem().open(inputFilePath, readBufferSize);
             fsDataInputStream.seek(startOffset);
             UUID uuid = UUID.randomUUID();
             TBrokerFD fd = parseUUIDToFD(uuid);
             clientContextManager.putNewInputStream(clientId, fd, 
fsDataInputStream, fileSystem);
             return fd;
-        } catch (IOException e) {
+        } catch (Exception e) {
             logger.error("errors while open path", e);
-            fileSystem.closeFileSystem();
+            if (fileSystem != null) 
moveBrokerFileSystemToRecycleBin(fileSystem);
             throw new 
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
                     e, "could not open file {}", path);
+        } finally {
+            if (fileSystem != null) fileSystem.decrementActiveOperations();
         }
     }
 
@@ -1310,19 +1292,22 @@ public class FileSystemManager {
     public TBrokerFD openWriter(String clientId, String path, Map<String, 
String> properties) {
         WildcardURI pathUri = new WildcardURI(path);
         Path inputFilePath = new Path(pathUri.getPath());
-        BrokerFileSystem fileSystem = getFileSystem(path, properties);
+        BrokerFileSystem fileSystem = null;
         try {
+            fileSystem = getFileSystem(path, properties);
             FSDataOutputStream fsDataOutputStream = 
fileSystem.getDFSFileSystem().create(inputFilePath,
                     true, writeBufferSize);
             UUID uuid = UUID.randomUUID();
             TBrokerFD fd = parseUUIDToFD(uuid);
             clientContextManager.putNewOutputStream(clientId, fd, 
fsDataOutputStream, fileSystem);
             return fd;
-        } catch (IOException e) {
+        } catch (Exception e) {
             logger.error("errors while open path", e);
-            fileSystem.closeFileSystem();
+            if (fileSystem != null) 
moveBrokerFileSystemToRecycleBin(fileSystem);
             throw new 
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
                     e, "could not open file {}", path);
+        } finally {
+            if (fileSystem != null) fileSystem.decrementActiveOperations();
         }
     }
 
@@ -1385,55 +1370,121 @@ public class FileSystemManager {
      *   2. For other authentication modes, the lastAccessTime is used to 
determine whether it has expired
      */
     private BrokerFileSystem updateCachedFileSystem(FileSystemIdentity 
fileSystemIdentity, Map<String, String> properties) {
-        BrokerFileSystem brokerFileSystem;
-        if (cachedFileSystem.containsKey(fileSystemIdentity)) {
-            brokerFileSystem = cachedFileSystem.get(fileSystemIdentity);
+        return cachedFileSystem.compute(fileSystemIdentity, (key, 
brokerFileSystemInMap) -> {
+            if (brokerFileSystemInMap == null) {
+                BrokerFileSystem newBrokerFileSystem = new 
BrokerFileSystem(fileSystemIdentity);
+                newBrokerFileSystem.incrementActiveOperations();
+                return newBrokerFileSystem;
+            }
             if (UserGroupInformation.isSecurityEnabled()) {
                 try {
                     
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
                 } catch (Exception e) {
                     logger.error("errors while refresh TGT: ", e);
                 }
-            } else if (brokerFileSystem.isExpiredByLastAccessTime()) {
-                brokerFileSystem.getLock().lock();
-                BrokerFileSystem bfs = 
cachedFileSystem.get(fileSystemIdentity);
-                if (!bfs.isExpiredByLastAccessTime()) {
-                  return bfs;
-                }
-                try {
-                    logger.info("file system " + brokerFileSystem + " is 
expired, update it.");
-                    brokerFileSystem.closeFileSystem();
-                } catch (Throwable t) {
-                    logger.error("errors while close file system: ", t);
-                } finally {
-                    brokerFileSystem.getLock().unlock();
-                }
-                brokerFileSystem = new BrokerFileSystem(fileSystemIdentity);
-                cachedFileSystem.put(fileSystemIdentity, brokerFileSystem);
+            } else if (brokerFileSystemInMap.isExpiredByLastAccessTime()) {
+                logger.info("file system " + brokerFileSystemInMap + " is 
expired, move to recycle bin and update it.");
+                fileSystemRecycleBin.add(brokerFileSystemInMap);
+                BrokerFileSystem newBrokerFileSystem = new 
BrokerFileSystem(fileSystemIdentity);
+                newBrokerFileSystem.incrementActiveOperations();
+                return newBrokerFileSystem;
             }
-        } else {
-            brokerFileSystem = new BrokerFileSystem(fileSystemIdentity);
-            cachedFileSystem.put(fileSystemIdentity, brokerFileSystem);
-        }
-        return brokerFileSystem;
+            brokerFileSystemInMap.incrementActiveOperations();
+            brokerFileSystemInMap.updateLastUpdateAccessTime();
+            return brokerFileSystemInMap;
+        });
     }
 
     public long fileSize(String path, Map<String, String> properties) {
         WildcardURI pathUri = new WildcardURI(path);
-        BrokerFileSystem fileSystem = getFileSystem(path, properties);
+        BrokerFileSystem fileSystem = null;
         Path filePath = new Path(pathUri.getPath());
         try {
+            fileSystem = getFileSystem(path, properties);
             FileStatus fileStatus = 
fileSystem.getDFSFileSystem().getFileStatus(filePath);
             if (fileStatus.isDirectory()) {
                 throw new 
BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
                     "not a file: {}", path);
             }
             return fileStatus.getLen();
-        } catch (IOException e) {
+        } catch (BrokerException e) {
+            // Used to handle exceptions generated by fileStatus.isDirectory 
above.
+            if (e.errorCode != 
TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH) {
+                if (fileSystem != null) 
moveBrokerFileSystemToRecycleBin(fileSystem);
+            }
             logger.error("errors while getting file size: " + path);
-            fileSystem.closeFileSystem();
+            throw new 
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
+                e, "errors while getting file size {}", path);
+        } catch (Exception e) {
+            logger.error("errors while getting file size: " + path);
+            if (fileSystem != null) 
moveBrokerFileSystemToRecycleBin(fileSystem);
             throw new 
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
                     e, "errors while getting file size {}", path);
+        } finally {
+            if (fileSystem != null) fileSystem.decrementActiveOperations();
+        }
+    }
+
+    private void moveBrokerFileSystemToRecycleBin(BrokerFileSystem 
brokerFileSystem) {
+        if (brokerFileSystem == null) {
+            return;
+        }
+        cachedFileSystem.compute(brokerFileSystem.getIdentity(), (k, v)-> {
+            if (brokerFileSystem == v) {
+                fileSystemRecycleBin.add(brokerFileSystem);
+                return null;
+            }
+            // this brokerFileSystem has been move to fileSystemRecycleBin by 
another thread, do nothing.
+            return v;
+        });
+    }
+
+    class CheckBrokerFileSystemExpirationTask implements Runnable {
+        @Override
+        public void run() {
+            UUID firstStillActiveFileSystemId = null;
+            BrokerFileSystem fs;
+            while ((fs = fileSystemRecycleBin.poll()) != null) {
+                fs.getLock().lock();
+                try {
+                    UUID fileSystemId = fs.getFileSystemId();
+                    // When retrieving the first fs that has been put back 
into the queue, break the loop.
+                    if (firstStillActiveFileSystemId != null && 
firstStillActiveFileSystemId.equals(fileSystemId)) {
+                        if (!ifSafeThenCloseFileSystem(fs)) {
+                            fileSystemRecycleBin.add(fs);
+                        }
+                        break;
+                    }
+                    if (!ifSafeThenCloseFileSystem(fs)) {
+                        if (firstStillActiveFileSystemId == null) {
+                            firstStillActiveFileSystemId = fileSystemId;
+                        }
+                        fileSystemRecycleBin.add(fs);
+                    }
+                } catch (IOException e) {
+                    // Fs.dfsFileSystem has been set to null, no need to add 
it back to the end of the queue.
+                    logger.warn("IO error while close file system: " + fs, e);
+                } catch (Exception e) {
+                    // Should not enter this branch! defensive programming
+                    fileSystemRecycleBin.add(fs);
+                    logger.error("unexpected errors while close file system, 
please check the code: " + fs, e);
+                } finally {
+                    fs.getLock().unlock();
+                }
+            }
+        }
+
+        private boolean ifSafeThenCloseFileSystem(BrokerFileSystem fs) throws 
IOException {
+            if (fs.getActiveOperationsCount() == 0 && 
fs.getActiveStreamCount() == 0) {
+                fs.closeFileSystem();
+                logger.info(fs + " is expired, remove it. " +
+                    "last access time is " + fs.getLastAccessTimestamp());
+                return true;
+            } else {
+                logger.info(fs + " has expired but still has 
activeOperationCount:"+ fs.getActiveOperationsCount() +
+                    " and activeStreamCount:" + fs.getActiveStreamCount()+ ", 
wait for the next round to close.");
+                return false;
+            }
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to