This is an automated email from the ASF dual-hosted git repository.
sollhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4e0dad7a78a [fix](broker) Fix broker OOM caused by file system netty
thread pool leak (issue #64138) (#64224)
4e0dad7a78a is described below
commit 4e0dad7a78ad053b3563d1e0f574e3c31eb2d93b
Author: LompleZ Liu <[email protected]>
AuthorDate: Thu Jun 11 19:47:21 2026 +0800
[fix](broker) Fix broker OOM caused by file system netty thread pool leak
(issue #64138) (#64224)
Refactoring the code logic of the broker file system cache:
1. Use the compute method of concurrent hash map to ensure the atomicity
of fs file system cache and to ensure the ideal singleton running mode
for a single file system as much as possible
2. Ensure that fs is eliminated from the cache, but the fs being read
will not be shut down abnormally:
2.1 BrowserFileSystem adds input/output stream reference counting
function, operation counting increases references
2.2 Add a fs recycle bin mechanism where fs is not directly removed,but
is first placed in the recycle bin
2.3 Add Recycle Bin Cleanup Thread
Previously, when Thread A encountered a storage access/operation failure
and consequently evicted the cached FileSystem (FS), forcefully closing
the file system at that time would cause Thread B, which was currently
using this file system for reading/writing data, to fail. Now, we put
the evicted file system into a recycle bin, and it will be safely closed
only when its reference count reaches 0.
Even if input/output streams encounter unexpected situations and are not
closed for a long time, causing the reference count to never reach 0, we
can still rely on the timeout detection for input/output streams in
CheckClientExpirationTask (client_expire_seconds) as a safety measure to
forcefully close the input/output streams, ensuring that the file
systems in the recycle bin can eventually be closed.
close #64138
<img width="10714" height="8736" alt="导出图(包含画布数据) excalidraw"
src="https://github.com/user-attachments/assets/fefef9ac-8fa8-4e2a-a2e3-ce7166c38b5b"
/>
Co-authored-by: liuhaozhe03 <[email protected]>
---
.../org/apache/doris/broker/hdfs/BrokerConfig.java | 9 +-
.../apache/doris/broker/hdfs/BrokerFileSystem.java | 48 +++-
.../doris/broker/hdfs/ClientContextManager.java | 88 ++----
.../doris/broker/hdfs/FileSystemManager.java | 313 ++++++++++++---------
4 files changed, 249 insertions(+), 209 deletions(-)
diff --git
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
index 5fd17706a1c..bc149732cf3 100644
---
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
+++
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
@@ -29,14 +29,11 @@ public class BrokerConfig extends ConfigBase {
public static int hdfs_write_buffer_size_kb = 1024;
@ConfField
+ // Do not set this value too small.
+ // Otherwise, a file system still being read/written by some thread in the
fileSystemRecycleBin
+ // may be abnormally closed due to the small value, causing BE execution
failure.
public static int client_expire_seconds = 3600;
- @ConfField
- public static boolean enable_input_stream_expire_check = false;
-
- @ConfField
- public static int input_stream_expire_seconds = 300;
-
@ConfField
public static int broker_ipc_port = 8000;
}
diff --git
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java
index 6c32f32f9ac..3d3232dc120 100644
---
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java
+++
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java
@@ -17,7 +17,9 @@
package org.apache.doris.broker.hdfs;
+import java.io.IOException;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.fs.FileSystem;
@@ -25,14 +27,15 @@ import org.apache.log4j.Logger;
public class BrokerFileSystem {
- private static Logger logger = Logger
- .getLogger(BrokerFileSystem.class.getName());
+ private static Logger logger =
Logger.getLogger(BrokerFileSystem.class.getName());
private ReentrantLock lock;
private FileSystemIdentity identity;
- private FileSystem dfsFileSystem;
+ private volatile FileSystem dfsFileSystem;
private volatile long lastAccessTimestamp;
private UUID fileSystemId;
+ private final AtomicInteger activeOperationCount = new AtomicInteger(0);
+ private final AtomicInteger activeStreamCount = new AtomicInteger(0);
public BrokerFileSystem(FileSystemIdentity identity) {
this.identity = identity;
@@ -42,21 +45,24 @@ public class BrokerFileSystem {
this.fileSystemId = UUID.randomUUID();
}
- public synchronized void setFileSystem(FileSystem fileSystem) {
- this.dfsFileSystem = fileSystem;
- this.lastAccessTimestamp = System.currentTimeMillis();
+ public void setFileSystem(FileSystem fileSystem) {
+ lock.lock();
+ try {
+ this.dfsFileSystem = fileSystem;
+ this.lastAccessTimestamp = System.currentTimeMillis();
+ } finally {
+ lock.unlock();
+ }
}
- public void closeFileSystem() {
+ public void closeFileSystem() throws IOException {
lock.lock();
try {
if (this.dfsFileSystem != null) {
try {
- // do not close file system, it will be closed
automatically.
- // this.dfsFileSystem.close();
- } catch (Exception e) {
- logger.error("errors while close file system", e);
+ this.dfsFileSystem.close();
} finally {
+ // Even if it fails, do not call close again, just set
null.
this.dfsFileSystem = null;
}
}
@@ -82,6 +88,26 @@ public class BrokerFileSystem {
return lock;
}
+ public UUID getFileSystemId() {
+ return fileSystemId;
+ }
+
+ public long getLastAccessTimestamp() {
+ return lastAccessTimestamp;
+ }
+
+ // now we only call incrementActiveOperations in updateCachedFileSystem.
+ // concurrentHashMap.compute() ensures atomicity,
+ // and all "brokerFileSystem object" that call this method must be in
"FileSystemManager.cachedFileSystem",
+ // so there is no need to lock it
+ public void incrementActiveOperations() {
activeOperationCount.incrementAndGet(); }
+ public void decrementActiveOperations() {
activeOperationCount.decrementAndGet(); }
+ public int getActiveOperationsCount() { return
activeOperationCount.get(); }
+
+ public void incrementActiveStreams() {
activeStreamCount.incrementAndGet(); }
+ public void decrementActiveStreams() {
activeStreamCount.decrementAndGet(); }
+ public int getActiveStreamCount() { return activeStreamCount.get(); }
+
public boolean isExpiredByLastAccessTime() {
return System.currentTimeMillis() - lastAccessTimestamp >
BrokerConfig.client_expire_seconds * 1000L;
}
diff --git
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
index 8e2fcfe60de..9492cf024fc 100644
---
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
+++
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
@@ -22,7 +22,9 @@ import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -34,10 +36,14 @@ import org.apache.doris.thrift.TBrokerOperationStatusCode;
public class ClientContextManager {
- private static Logger logger = Logger
- .getLogger(ClientContextManager.class.getName());
- private ScheduledExecutorService clientCheckExecutorService;
- private ScheduledExecutorService inputStreamCheckExecuterService;
+ private static Logger logger =
Logger.getLogger(ClientContextManager.class.getName());
+ private ScheduledExecutorService clientContextCheckExecutorService =
Executors.newScheduledThreadPool(2,
+ new ThreadFactory() {
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+ @Override
+ public Thread newThread(Runnable r) { return new Thread(r,
"clientContextCleanup-pool-" + threadNumber.getAndIncrement()); }
+ });
private ConcurrentHashMap<String, ClientResourceContext> clientContexts;
private ConcurrentHashMap<TBrokerFD, String> fdToClientMap;
private int clientExpirationSeconds = BrokerConfig.client_expire_seconds;
@@ -45,12 +51,7 @@ public class ClientContextManager {
public ClientContextManager() {
clientContexts = new ConcurrentHashMap<>();
fdToClientMap = new ConcurrentHashMap<>();
- this.clientCheckExecutorService = Executors.newScheduledThreadPool(2);
- this.clientCheckExecutorService.schedule(new
CheckClientExpirationTask(), 0, TimeUnit.SECONDS);
- if (BrokerConfig.enable_input_stream_expire_check) {
- this.inputStreamCheckExecuterService =
Executors.newScheduledThreadPool(2);
- this.inputStreamCheckExecuterService.schedule(new
CheckInputStreamExpirationTask(), 0, TimeUnit.SECONDS);
- }
+ this.clientContextCheckExecutorService.schedule(new
CheckClientExpirationTask(), 0, TimeUnit.SECONDS);
}
public void onPing(String clientId) {
@@ -110,12 +111,14 @@ public class ClientContextManager {
}
ClientResourceContext clientContext = clientContexts.get(clientId);
BrokerInputStream brokerInputStream =
clientContext.inputStreams.remove(fd);
- try {
- if (brokerInputStream != null) {
+ if (brokerInputStream != null) {
+ try {
brokerInputStream.inputStream.close();
+ } catch (Exception e) {
+ logger.error("errors while close file data input stream", e);
+ } finally {
+ brokerInputStream.brokerFileSystem.decrementActiveStreams();
}
- } catch (Exception e) {
- logger.error("errors while close file data input stream", e);
}
}
@@ -126,30 +129,13 @@ public class ClientContextManager {
}
ClientResourceContext clientContext = clientContexts.get(clientId);
BrokerOutputStream brokerOutputStream =
clientContext.outputStreams.remove(fd);
- try {
- if (brokerOutputStream != null) {
+ if (brokerOutputStream != null) {
+ try {
brokerOutputStream.outputStream.close();
- }
- } catch (Exception e) {
- logger.error("errors while close file data output stream", e);
- }
- }
-
- public synchronized void remoteExpireInputStreams() {
- int inputStreamExpireSeconds =
BrokerConfig.input_stream_expire_seconds;
- TBrokerFD fd;
- for (ClientResourceContext clientContext : clientContexts.values()) {
- Iterator<Entry<TBrokerFD, BrokerInputStream>> iter =
clientContext.inputStreams.entrySet().iterator();
- while (iter.hasNext()) {
- Entry<TBrokerFD, BrokerInputStream> entry = iter.next();
- fd = entry.getKey();
- if (entry.getValue().checkExpire(inputStreamExpireSeconds)) {
- ClientContextManager.this.removeInputStream(fd);
- iter.remove();
- logger.info(fd + " in client [" + clientContext.clientId
- + "] is expired, remove it from contexts. last
update time is "
- + entry.getValue().getLastPingTimestamp());
- }
+ } catch (Exception e) {
+ logger.error("errors while close file data output stream", e);
+ } finally {
+ brokerOutputStream.brokerFileSystem.decrementActiveStreams();
}
}
}
@@ -159,7 +145,7 @@ public class ClientContextManager {
public void run() {
try {
for (ClientResourceContext clientContext :
clientContexts.values()) {
- if (System.currentTimeMillis() -
clientContext.lastAccessTimestamp > clientExpirationSeconds * 1000) {
+ if (System.currentTimeMillis() -
clientContext.lastAccessTimestamp > clientExpirationSeconds * 1000L) {
for (TBrokerFD fd :
clientContext.inputStreams.keySet()) {
ClientContextManager.this.removeInputStream(fd);
}
@@ -173,18 +159,7 @@ public class ClientContextManager {
}
}
} finally {
-
ClientContextManager.this.clientCheckExecutorService.schedule(this, 60,
TimeUnit.SECONDS);
- }
- }
- }
-
- class CheckInputStreamExpirationTask implements Runnable {
- @Override
- public void run() {
- try {
- ClientContextManager.this.remoteExpireInputStreams();
- } finally {
-
ClientContextManager.this.inputStreamCheckExecuterService.schedule(this, 60,
TimeUnit.SECONDS);
+
ClientContextManager.this.clientContextCheckExecutorService.schedule(this, 60,
TimeUnit.SECONDS);
}
}
}
@@ -198,6 +173,7 @@ public class ClientContextManager {
this.outputStream = outputStream;
this.brokerFileSystem = brokerFileSystem;
this.brokerFileSystem.updateLastUpdateAccessTime();
+ this.brokerFileSystem.incrementActiveStreams();
}
public FSDataOutputStream getOutputStream() {
@@ -214,32 +190,22 @@ public class ClientContextManager {
private final FSDataInputStream inputStream;
private final BrokerFileSystem brokerFileSystem;
- private AtomicLong lastPingTimestamp;
public BrokerInputStream(FSDataInputStream inputStream,
BrokerFileSystem brokerFileSystem) {
this.inputStream = inputStream;
this.brokerFileSystem = brokerFileSystem;
this.brokerFileSystem.updateLastUpdateAccessTime();
- this.lastPingTimestamp = new
AtomicLong(System.currentTimeMillis());
+ this.brokerFileSystem.incrementActiveStreams();
}
public FSDataInputStream getInputStream() {
this.brokerFileSystem.updateLastUpdateAccessTime();
- this.lastPingTimestamp.set(System.currentTimeMillis());
return inputStream;
}
public void updateLastUpdateAccessTime() {
this.brokerFileSystem.updateLastUpdateAccessTime();
}
-
- public boolean checkExpire(long expireSecond) {
- return System.currentTimeMillis() - lastPingTimestamp.get() >
expireSecond * 1000;
- }
-
- public long getLastPingTimestamp() {
- return lastPingTimestamp.get();
- }
}
static class ClientResourceContext {
diff --git
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 364035d6cdb..7a2d522d243 100644
---
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -45,7 +45,6 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
-import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.FileLock;
@@ -60,11 +59,16 @@ import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
public class FileSystemManager {
- private static Logger logger = Logger
- .getLogger(FileSystemManager.class.getName());
+ private static Logger logger =
Logger.getLogger(FileSystemManager.class.getName());
// supported scheme
private static final String HDFS_SCHEME = "hdfs";
private static final String VIEWFS_SCHEME = "viewfs";
@@ -80,9 +84,9 @@ public class FileSystemManager {
private static final String GFS_SCHEME = "gfs";
private static final String GCS_SCHEME = "gs";
-
+
private static final String FS_PREFIX = "fs.";
-
+
private static final String GCS_PROJECT_ID_KEY = "fs.gs.project.id";
private static final String USER_NAME_KEY = "username";
@@ -167,11 +171,25 @@ public class FileSystemManager {
private int writeBufferSize = 128 << 10; // 128k
private ConcurrentHashMap<FileSystemIdentity, BrokerFileSystem>
cachedFileSystem;
+ // Used to store expired file systems that have been phased out from the
`cachedFileSystem` above,
+ // but there may be inputstreams and outputstreams that are transmitting
data that depend on these file systems.
+ // Therefore, we use background threads to traverse this queue to
determine whether it can be safely deleted.
+ private ConcurrentLinkedQueue<BrokerFileSystem> fileSystemRecycleBin;
private ClientContextManager clientContextManager;
+ private ScheduledExecutorService fileSystemRecycleBinCleanupPool =
Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactory() {
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+
+ @Override
+ public Thread newThread(Runnable r) { return new Thread(r,
"fileSystemCleanup-pool-" + threadNumber.getAndIncrement()); }
+ });
+
public FileSystemManager() {
cachedFileSystem = new ConcurrentHashMap<>();
+ fileSystemRecycleBin = new ConcurrentLinkedQueue<>();
clientContextManager = new ClientContextManager();
+ fileSystemRecycleBinCleanupPool.scheduleWithFixedDelay(new
CheckBrokerFileSystemExpirationTask(), 0, 3, TimeUnit.SECONDS);
readBufferSize = BrokerConfig.hdfs_read_buffer_size_kb << 10;
writeBufferSize = BrokerConfig.hdfs_write_buffer_size_kb << 10;
}
@@ -198,13 +216,12 @@ public class FileSystemManager {
/**
* visible for test
*
- * @param path
- * @param properties
+ * @param path the file path to be parsed (e.g., hdfs://...)
+ * @param properties the configuration properties for the file system
* @return BrokerFileSystem with different FileSystem based on scheme
- * @throws URISyntaxException
- * @throws Exception
+ * @throws BrokerException if the input path is invalid or the scheme is
not supported
*/
- public BrokerFileSystem getFileSystem(String path, Map<String, String>
properties) {
+ public BrokerFileSystem getFileSystem(String path, Map<String, String>
properties) throws BrokerException {
WildcardURI pathUri = new WildcardURI(path);
String scheme = pathUri.getUri().getScheme();
if (Strings.isNullOrEmpty(scheme)) {
@@ -249,11 +266,6 @@ public class FileSystemManager {
* file system handle is cached, the identity is host + username_password
* it will have safety problem if only hostname is used because one user
may specify username and password
* and then access hdfs, another user may not specify username and
password but could also access data
- * @param path
- * @param properties
- * @return
- * @throws URISyntaxException
- * @throws Exception
*/
public BrokerFileSystem getDistributedFileSystem(String path, Map<String,
String> properties) {
WildcardURI pathUri = new WildcardURI(path);
@@ -438,6 +450,7 @@ public class FileSystemManager {
}
return fileSystem;
} catch (Exception e) {
+ fileSystem.decrementActiveOperations();
logger.error("errors while connect to " + path, e);
throw new
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
} finally {
@@ -447,13 +460,7 @@ public class FileSystemManager {
/**
* visible for test
- *
* file system handle is cached, the identity is host + accessKey_secretKey
- * @param path
- * @param properties
- * @return
- * @throws URISyntaxException
- * @throws Exception
*/
public BrokerFileSystem getS3AFileSystem(String path, Map<String, String>
properties) {
WildcardURI pathUri = new WildcardURI(path);
@@ -480,6 +487,7 @@ public class FileSystemManager {
}
return fileSystem;
} catch (Exception e) {
+ fileSystem.decrementActiveOperations();
logger.error("errors while connect to " + path, e);
throw new
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
} finally {
@@ -519,6 +527,7 @@ public class FileSystemManager {
}
return fileSystem;
} catch (Exception e) {
+ fileSystem.decrementActiveOperations();
logger.error("errors while connect to " + path, e);
throw new
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
} finally {
@@ -528,9 +537,6 @@ public class FileSystemManager {
/**
* file system handle is cached, the identity is endpoint + bucket +
accessKey_secretKey
- * @param path
- * @param properties
- * @return
*/
public BrokerFileSystem getOBSFileSystem(String path, Map<String, String>
properties) {
WildcardURI pathUri = new WildcardURI(path);
@@ -541,7 +547,6 @@ public class FileSystemManager {
String host = OBS_SCHEME + "://" + endpoint + "/" +
pathUri.getUri().getHost();
String obsUgi = accessKey + "," + secretKey;
FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host,
obsUgi);
- cachedFileSystem.putIfAbsent(fileSystemIdentity, new
BrokerFileSystem(fileSystemIdentity));
BrokerFileSystem fileSystem =
updateCachedFileSystem(fileSystemIdentity, properties);
fileSystem.getLock().lock();
try {
@@ -559,6 +564,7 @@ public class FileSystemManager {
}
return fileSystem;
} catch (Exception e) {
+ fileSystem.decrementActiveOperations();
logger.error("errors while connect to " + path, e);
throw new
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
} finally {
@@ -568,14 +574,7 @@ public class FileSystemManager {
/**
* visible for test
- * <p>
* file system handle is cached, the identity is endpoint + bucket +
accessKey_secretKey
- *
- * @param path
- * @param properties
- * @return
- * @throws URISyntaxException
- * @throws Exception
*/
public BrokerFileSystem getKS3FileSystem(String path, Map<String, String>
properties) {
WildcardURI pathUri = new WildcardURI(path);
@@ -605,6 +604,7 @@ public class FileSystemManager {
}
return fileSystem;
} catch (Exception e) {
+ fileSystem.decrementActiveOperations();
logger.error("errors while connect to " + path, e);
throw new
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
} finally {
@@ -612,11 +612,8 @@ public class FileSystemManager {
}
}
- /**
+ /**
* file system handle is cached, the identity is endpoint + bucket +
accessKey_secretKey
- * @param path
- * @param properties
- * @return
*/
public BrokerFileSystem getOSSFileSystem(String path, Map<String, String>
properties) {
WildcardURI pathUri = new WildcardURI(path);
@@ -627,7 +624,6 @@ public class FileSystemManager {
String host = OSS_SCHEME + "://" + endpoint + "/" +
pathUri.getUri().getHost();
String ossUgi = accessKey + "," + secretKey;
FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host,
ossUgi);
- cachedFileSystem.putIfAbsent(fileSystemIdentity, new
BrokerFileSystem(fileSystemIdentity));
BrokerFileSystem fileSystem =
updateCachedFileSystem(fileSystemIdentity, properties);
fileSystem.getLock().lock();
try {
@@ -645,6 +641,7 @@ public class FileSystemManager {
}
return fileSystem;
} catch (Exception e) {
+ fileSystem.decrementActiveOperations();
logger.error("errors while connect to " + path, e);
throw new
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
} finally {
@@ -654,13 +651,7 @@ public class FileSystemManager {
/**
* visible for test
- *
* file system handle is cached, the identity is for all chdfs.
- * @param path
- * @param properties
- * @return
- * @throws URISyntaxException
- * @throws Exception
*/
public BrokerFileSystem getChdfsFileSystem(String path, Map<String,
String> properties) {
WildcardURI pathUri = new WildcardURI(path);
@@ -761,6 +752,7 @@ public class FileSystemManager {
}
return fileSystem;
} catch (Exception e) {
+ fileSystem.decrementActiveOperations();
logger.error("errors while connect to " + path, e);
throw new
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
} finally {
@@ -770,14 +762,7 @@ public class FileSystemManager {
/**
* visible for test
- * <p>
* file system handle is cached, the identity is endpoint + bucket +
accessKey_secretKey
- *
- * @param path
- * @param properties
- * @return
- * @throws URISyntaxException
- * @throws Exception
*/
public BrokerFileSystem getCOSFileSystem(String path, Map<String, String>
properties) {
WildcardURI pathUri = new WildcardURI(path);
@@ -807,6 +792,7 @@ public class FileSystemManager {
}
return fileSystem;
} catch (Exception e) {
+ fileSystem.decrementActiveOperations();
logger.error("errors while connect to " + path, e);
throw new
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
} finally {
@@ -816,14 +802,7 @@ public class FileSystemManager {
/**
* visible for test
- * <p>
* file system handle is cached, the identity is endpoint + bucket +
accessKey_secretKey
- *
- * @param path
- * @param properties
- * @return
- * @throws URISyntaxException
- * @throws Exception
*/
public BrokerFileSystem getBOSFileSystem(String path, Map<String, String>
properties) {
WildcardURI pathUri = new WildcardURI(path);
@@ -854,6 +833,7 @@ public class FileSystemManager {
}
return fileSystem;
} catch (Exception e) {
+ fileSystem.decrementActiveOperations();
logger.error("errors while connect to " + path, e);
throw new
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
} finally {
@@ -863,13 +843,7 @@ public class FileSystemManager {
/**
* visible for test
- *
- * file system handle is cached, the identity is for all juicefs.
- * @param path
- * @param properties
- * @return
- * @throws URISyntaxException
- * @throws Exception
+ * file system handle is cached, the identity is for all juiceFs.
*/
public BrokerFileSystem getJuiceFileSystem(String path, Map<String,
String> properties) {
WildcardURI pathUri = new WildcardURI(path);
@@ -980,6 +954,7 @@ public class FileSystemManager {
}
return fileSystem;
} catch (Exception e) {
+ fileSystem.decrementActiveOperations();
logger.error("errors while connect to " + path, e);
throw new
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
} finally {
@@ -995,15 +970,9 @@ public class FileSystemManager {
String group = properties.containsKey(HADOOP_JOB_GROUP_NAME) ?
properties.get(HADOOP_JOB_GROUP_NAME) : "";
String afsUgi = username + "," + password;
FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host,
afsUgi, group);
- cachedFileSystem.putIfAbsent(fileSystemIdentity, new
BrokerFileSystem(fileSystemIdentity));
BrokerFileSystem fileSystem =
updateCachedFileSystem(fileSystemIdentity, properties);
fileSystem.getLock().lock();
try {
- if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
- // this means the file system is closed by file system checker
thread
- // it is a corner case
- return null;
- }
if (fileSystem.getDFSFileSystem() == null) {
logger.info("could not find file system for path " + path + "
create a new one");
// create a new file system
@@ -1021,6 +990,7 @@ public class FileSystemManager {
}
return fileSystem;
} catch (Exception e) {
+ fileSystem.decrementActiveOperations();
logger.error("errors while connect to " + path, e);
throw new
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e, e.getMessage());
} finally {
@@ -1028,13 +998,6 @@ public class FileSystemManager {
}
}
- /**
- * @param path
- * @param properties
- * @return
- * @throws URISyntaxException
- * @throws Exception
- */
public BrokerFileSystem getGooseFSFileSystem(String path, Map<String,
String> properties) {
WildcardURI pathUri = new WildcardURI(path);
// endpoint is the server host, pathUri.getUri().getHost() is the
bucket
@@ -1045,33 +1008,35 @@ public class FileSystemManager {
String password = properties.getOrDefault(PASSWORD_KEY, "");
String gfsUgi = username + "," + password;
FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host,
gfsUgi);
- BrokerFileSystem brokerFileSystem =
updateCachedFileSystem(fileSystemIdentity, properties);
- brokerFileSystem.getLock().lock();
+ BrokerFileSystem fileSystem =
updateCachedFileSystem(fileSystemIdentity, properties);
+ fileSystem.getLock().lock();
try {
- if (brokerFileSystem.getDFSFileSystem() == null) {
- logger.info("create goosefs client: " + path);
+ if (fileSystem.getDFSFileSystem() == null) {
+ logger.info("create gooseFs client: " + path);
Configuration conf = new Configuration();
for (Map.Entry<String, String> propElement :
properties.entrySet()) {
conf.set(propElement.getKey(), propElement.getValue());
}
- FileSystem fileSystem = FileSystem.get(pathUri.getUri(), conf);
- brokerFileSystem.setFileSystem(fileSystem);
+ FileSystem gooseFileSystem = FileSystem.get(pathUri.getUri(),
conf);
+ fileSystem.setFileSystem(gooseFileSystem);
}
- return brokerFileSystem;
+ return fileSystem;
} catch (Exception e) {
+ fileSystem.decrementActiveOperations();
logger.error("errors while connect to " + path, e);
throw new
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
} finally {
- brokerFileSystem.getLock().unlock();
+ fileSystem.getLock().unlock();
}
}
public List<TBrokerFileStatus> listLocatedFiles(String path, boolean
onlyFiles,
boolean recursive,
Map<String, String> properties) {
List<TBrokerFileStatus> resultFileStatus = null;
- BrokerFileSystem fileSystem = getFileSystem(path, properties);
+ BrokerFileSystem fileSystem = null;
Path locatedPath = new Path(path);
try {
+ fileSystem = getFileSystem(path, properties);
FileSystem innerFileSystem = fileSystem.getDFSFileSystem();
RemoteIterator<LocatedFileStatus> locatedFiles = onlyFiles ?
innerFileSystem.listFiles(locatedPath, recursive)
: innerFileSystem.listLocatedStatus(locatedPath);
@@ -1082,9 +1047,11 @@ public class FileSystemManager {
e, "file not found");
} catch (Exception e) {
logger.error("errors while get file status ", e);
- fileSystem.closeFileSystem();
+ if (fileSystem != null)
moveBrokerFileSystemToRecycleBin(fileSystem);
throw new
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
e, "unknown error when listLocatedFiles");
+ } finally {
+ if (fileSystem != null) fileSystem.decrementActiveOperations();
}
}
@@ -1112,9 +1079,10 @@ public class FileSystemManager {
public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly,
Map<String, String> properties) {
List<TBrokerFileStatus> resultFileStatus = null;
WildcardURI pathUri = new WildcardURI(path);
- BrokerFileSystem fileSystem = getFileSystem(path, properties);
Path pathPattern = new Path(pathUri.getPath());
+ BrokerFileSystem fileSystem = null;
try {
+ fileSystem = getFileSystem(path, properties);
FileStatus[] files =
fileSystem.getDFSFileSystem().globStatus(pathPattern);
if (files == null) {
resultFileStatus = new ArrayList<>(0);
@@ -1147,24 +1115,29 @@ public class FileSystemManager {
e, "file not found");
} catch (Exception e) {
logger.error("errors while get file status ", e);
- fileSystem.closeFileSystem();
+ if (fileSystem != null)
moveBrokerFileSystemToRecycleBin(fileSystem);
throw new
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
e, "unknown error when get file status");
+ } finally {
+ if (fileSystem != null) fileSystem.decrementActiveOperations();
}
return resultFileStatus;
}
public void deletePath(String path, Map<String, String> properties) {
WildcardURI pathUri = new WildcardURI(path);
- BrokerFileSystem fileSystem = getFileSystem(path, properties);
+ BrokerFileSystem fileSystem = null;
Path filePath = new Path(pathUri.getPath());
try {
+ fileSystem = getFileSystem(path, properties);
fileSystem.getDFSFileSystem().delete(filePath, true);
- } catch (IOException e) {
+ } catch (Exception e) {
logger.error("errors while delete path " + path);
- fileSystem.closeFileSystem();
+ if (fileSystem != null)
moveBrokerFileSystemToRecycleBin(fileSystem);
throw new
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
e, "delete path {} error", path);
+ } finally {
+ if (fileSystem != null) fileSystem.decrementActiveOperations();
}
}
@@ -1175,54 +1148,63 @@ public class FileSystemManager {
throw new
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
"only allow rename in same file system");
}
- BrokerFileSystem fileSystem = getFileSystem(srcPath, properties);
+ BrokerFileSystem fileSystem = null;
Path srcfilePath = new Path(srcPathUri.getPath());
Path destfilePath = new Path(destPathUri.getPath());
try {
+ fileSystem = getFileSystem(srcPath, properties);
boolean isRenameSuccess =
fileSystem.getDFSFileSystem().rename(srcfilePath, destfilePath);
if (!isRenameSuccess) {
throw new
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
"failed to rename path from {} to {}", srcPath,
destPath);
}
- } catch (IOException e) {
+ } catch (Exception e) {
logger.error("errors while rename path from " + srcPath + " to " +
destPath);
- fileSystem.closeFileSystem();
+ if (fileSystem != null)
moveBrokerFileSystemToRecycleBin(fileSystem);
throw new
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
e, "errors while rename {} to {}", srcPath, destPath);
+ } finally {
+ if (fileSystem != null) fileSystem.decrementActiveOperations();
}
}
public boolean checkPathExist(String path, Map<String, String> properties)
{
WildcardURI pathUri = new WildcardURI(path);
- BrokerFileSystem fileSystem = getFileSystem(path, properties);
+ BrokerFileSystem fileSystem = null;
Path filePath = new Path(pathUri.getPath());
try {
+ fileSystem = getFileSystem(path, properties);
boolean isPathExist =
fileSystem.getDFSFileSystem().exists(filePath);
return isPathExist;
- } catch (IOException e) {
+ } catch (Exception e) {
logger.error("errors while check path exist: " + path);
- fileSystem.closeFileSystem();
+ if (fileSystem != null)
moveBrokerFileSystemToRecycleBin(fileSystem);
throw new
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
e, "errors while check if path {} exist", path);
+ } finally {
+ if (fileSystem != null) fileSystem.decrementActiveOperations();
}
}
public TBrokerFD openReader(String clientId, String path, long
startOffset, Map<String, String> properties) {
WildcardURI pathUri = new WildcardURI(path);
Path inputFilePath = new Path(pathUri.getPath());
- BrokerFileSystem fileSystem = getFileSystem(path, properties);
+ BrokerFileSystem fileSystem = null;
try {
+ fileSystem = getFileSystem(path, properties);
FSDataInputStream fsDataInputStream =
fileSystem.getDFSFileSystem().open(inputFilePath, readBufferSize);
fsDataInputStream.seek(startOffset);
UUID uuid = UUID.randomUUID();
TBrokerFD fd = parseUUIDToFD(uuid);
clientContextManager.putNewInputStream(clientId, fd,
fsDataInputStream, fileSystem);
return fd;
- } catch (IOException e) {
+ } catch (Exception e) {
logger.error("errors while open path", e);
- fileSystem.closeFileSystem();
+ if (fileSystem != null)
moveBrokerFileSystemToRecycleBin(fileSystem);
throw new
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
e, "could not open file {}", path);
+ } finally {
+ if (fileSystem != null) fileSystem.decrementActiveOperations();
}
}
@@ -1310,19 +1292,22 @@ public class FileSystemManager {
public TBrokerFD openWriter(String clientId, String path, Map<String,
String> properties) {
WildcardURI pathUri = new WildcardURI(path);
Path inputFilePath = new Path(pathUri.getPath());
- BrokerFileSystem fileSystem = getFileSystem(path, properties);
+ BrokerFileSystem fileSystem = null;
try {
+ fileSystem = getFileSystem(path, properties);
FSDataOutputStream fsDataOutputStream =
fileSystem.getDFSFileSystem().create(inputFilePath,
true, writeBufferSize);
UUID uuid = UUID.randomUUID();
TBrokerFD fd = parseUUIDToFD(uuid);
clientContextManager.putNewOutputStream(clientId, fd,
fsDataOutputStream, fileSystem);
return fd;
- } catch (IOException e) {
+ } catch (Exception e) {
logger.error("errors while open path", e);
- fileSystem.closeFileSystem();
+ if (fileSystem != null)
moveBrokerFileSystemToRecycleBin(fileSystem);
throw new
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
e, "could not open file {}", path);
+ } finally {
+ if (fileSystem != null) fileSystem.decrementActiveOperations();
}
}
@@ -1385,55 +1370,121 @@ public class FileSystemManager {
* 2. For other authentication modes, the lastAccessTime is used to
determine whether it has expired
*/
private BrokerFileSystem updateCachedFileSystem(FileSystemIdentity
fileSystemIdentity, Map<String, String> properties) {
- BrokerFileSystem brokerFileSystem;
- if (cachedFileSystem.containsKey(fileSystemIdentity)) {
- brokerFileSystem = cachedFileSystem.get(fileSystemIdentity);
+ return cachedFileSystem.compute(fileSystemIdentity, (key,
brokerFileSystemInMap) -> {
+ if (brokerFileSystemInMap == null) {
+ BrokerFileSystem newBrokerFileSystem = new
BrokerFileSystem(fileSystemIdentity);
+ newBrokerFileSystem.incrementActiveOperations();
+ return newBrokerFileSystem;
+ }
if (UserGroupInformation.isSecurityEnabled()) {
try {
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
} catch (Exception e) {
logger.error("errors while refresh TGT: ", e);
}
- } else if (brokerFileSystem.isExpiredByLastAccessTime()) {
- brokerFileSystem.getLock().lock();
- BrokerFileSystem bfs =
cachedFileSystem.get(fileSystemIdentity);
- if (!bfs.isExpiredByLastAccessTime()) {
- return bfs;
- }
- try {
- logger.info("file system " + brokerFileSystem + " is
expired, update it.");
- brokerFileSystem.closeFileSystem();
- } catch (Throwable t) {
- logger.error("errors while close file system: ", t);
- } finally {
- brokerFileSystem.getLock().unlock();
- }
- brokerFileSystem = new BrokerFileSystem(fileSystemIdentity);
- cachedFileSystem.put(fileSystemIdentity, brokerFileSystem);
+ } else if (brokerFileSystemInMap.isExpiredByLastAccessTime()) {
+ logger.info("file system " + brokerFileSystemInMap + " is
expired, move to recycle bin and update it.");
+ fileSystemRecycleBin.add(brokerFileSystemInMap);
+ BrokerFileSystem newBrokerFileSystem = new
BrokerFileSystem(fileSystemIdentity);
+ newBrokerFileSystem.incrementActiveOperations();
+ return newBrokerFileSystem;
}
- } else {
- brokerFileSystem = new BrokerFileSystem(fileSystemIdentity);
- cachedFileSystem.put(fileSystemIdentity, brokerFileSystem);
- }
- return brokerFileSystem;
+ brokerFileSystemInMap.incrementActiveOperations();
+ brokerFileSystemInMap.updateLastUpdateAccessTime();
+ return brokerFileSystemInMap;
+ });
}
public long fileSize(String path, Map<String, String> properties) {
WildcardURI pathUri = new WildcardURI(path);
- BrokerFileSystem fileSystem = getFileSystem(path, properties);
+ BrokerFileSystem fileSystem = null;
Path filePath = new Path(pathUri.getPath());
try {
+ fileSystem = getFileSystem(path, properties);
FileStatus fileStatus =
fileSystem.getDFSFileSystem().getFileStatus(filePath);
if (fileStatus.isDirectory()) {
throw new
BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
"not a file: {}", path);
}
return fileStatus.getLen();
- } catch (IOException e) {
+ } catch (BrokerException e) {
+ // Used to handle exceptions generated by fileStatus.isDirectory
above.
+ if (e.errorCode !=
TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH) {
+ if (fileSystem != null)
moveBrokerFileSystemToRecycleBin(fileSystem);
+ }
logger.error("errors while getting file size: " + path);
- fileSystem.closeFileSystem();
+ throw new
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
+ e, "errors while getting file size {}", path);
+ } catch (Exception e) {
+ logger.error("errors while getting file size: " + path);
+ if (fileSystem != null)
moveBrokerFileSystemToRecycleBin(fileSystem);
throw new
BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR,
e, "errors while getting file size {}", path);
+ } finally {
+ if (fileSystem != null) fileSystem.decrementActiveOperations();
+ }
+ }
+
+ private void moveBrokerFileSystemToRecycleBin(BrokerFileSystem
brokerFileSystem) {
+ if (brokerFileSystem == null) {
+ return;
+ }
+ cachedFileSystem.compute(brokerFileSystem.getIdentity(), (k, v)-> {
+ if (brokerFileSystem == v) {
+ fileSystemRecycleBin.add(brokerFileSystem);
+ return null;
+ }
+ // this brokerFileSystem has been move to fileSystemRecycleBin by
another thread, do nothing.
+ return v;
+ });
+ }
+
+ class CheckBrokerFileSystemExpirationTask implements Runnable {
+ @Override
+ public void run() {
+ UUID firstStillActiveFileSystemId = null;
+ BrokerFileSystem fs;
+ while ((fs = fileSystemRecycleBin.poll()) != null) {
+ fs.getLock().lock();
+ try {
+ UUID fileSystemId = fs.getFileSystemId();
+ // When retrieving the first fs that has been put back
into the queue, break the loop.
+ if (firstStillActiveFileSystemId != null &&
firstStillActiveFileSystemId.equals(fileSystemId)) {
+ if (!ifSafeThenCloseFileSystem(fs)) {
+ fileSystemRecycleBin.add(fs);
+ }
+ break;
+ }
+ if (!ifSafeThenCloseFileSystem(fs)) {
+ if (firstStillActiveFileSystemId == null) {
+ firstStillActiveFileSystemId = fileSystemId;
+ }
+ fileSystemRecycleBin.add(fs);
+ }
+ } catch (IOException e) {
+ // Fs.dfsFileSystem has been set to null, no need to add
it back to the end of the queue.
+ logger.warn("IO error while close file system: " + fs, e);
+ } catch (Exception e) {
+ // Should not enter this branch! defensive programming
+ fileSystemRecycleBin.add(fs);
+ logger.error("unexpected errors while close file system,
please check the code: " + fs, e);
+ } finally {
+ fs.getLock().unlock();
+ }
+ }
+ }
+
+ private boolean ifSafeThenCloseFileSystem(BrokerFileSystem fs) throws
IOException {
+ if (fs.getActiveOperationsCount() == 0 &&
fs.getActiveStreamCount() == 0) {
+ fs.closeFileSystem();
+ logger.info(fs + " is expired, remove it. " +
+ "last access time is " + fs.getLastAccessTimestamp());
+ return true;
+ } else {
+ logger.info(fs + " has expired but still has
activeOperationCount:"+ fs.getActiveOperationsCount() +
+ " and activeStreamCount:" + fs.getActiveStreamCount()+ ",
wait for the next round to close.");
+ return false;
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]