This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 13cb81a724 [fix](broker) Fix bug that heavy broker load may failed due
to BrokerException which indicate the fd is not owned by client (#16350)
13cb81a724 is described below
commit 13cb81a7243e634827ffbfcc9649ee3fa13192a1
Author: caiconghui <[email protected]>
AuthorDate: Fri Feb 3 15:06:45 2023 +0800
[fix](broker) Fix bug that heavy broker load may failed due to
BrokerException which indicate the fd is not owned by client (#16350)
Co-authored-by: caiconghui1 <[email protected]>
---
.../apache/doris/broker/hdfs/BrokerFileSystem.java | 2 +-
.../doris/broker/hdfs/ClientContextManager.java | 23 +++++++++++++++-------
2 files changed, 17 insertions(+), 8 deletions(-)
diff --git
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java
index c8a217d205..22f6925d31 100644
---
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java
+++
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerFileSystem.java
@@ -31,7 +31,7 @@ public class BrokerFileSystem {
private ReentrantLock lock;
private FileSystemIdentity identity;
private FileSystem dfsFileSystem;
- private long lastAccessTimestamp;
+ private volatile long lastAccessTimestamp;
private long createTimestamp;
private UUID fileSystemId;
diff --git
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
index e52f248e11..736f9ae448 100644
---
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
+++
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
@@ -131,7 +131,7 @@ public class ClientContextManager {
public void run() {
try {
for (ClientResourceContext clientContext :
clientContexts.values()) {
- if (System.currentTimeMillis() -
clientContext.lastPingTimestamp > clientExpirationSeconds * 1000) {
+ if (System.currentTimeMillis() -
clientContext.lastAccessTimestamp > clientExpirationSeconds * 1000) {
for (TBrokerFD fd :
clientContext.inputStreams.keySet()) {
ClientContextManager.this.removeInputStream(fd);
}
@@ -139,9 +139,9 @@ public class ClientContextManager {
ClientContextManager.this.removeOutputStream(fd);
}
clientContexts.remove(clientContext.clientId);
- logger.info("client [" + clientContext.clientId
- + "] is expired, remove it from contexts. last
ping time is "
- + clientContext.lastPingTimestamp);
+ logger.info("client [" + clientContext.clientId
+ + "] is expired, remove it from contexts. last
access time is "
+ + clientContext.lastAccessTimestamp);
}
}
} finally {
@@ -197,24 +197,28 @@ public class ClientContextManager {
private String clientId;
private ConcurrentHashMap<TBrokerFD, BrokerInputStream> inputStreams;
private ConcurrentHashMap<TBrokerFD, BrokerOutputStream> outputStreams;
- private long lastPingTimestamp;
+
+ private volatile long lastAccessTimestamp;
public ClientResourceContext(String clientId) {
this.clientId = clientId;
this.inputStreams = new ConcurrentHashMap<>();
this.outputStreams = new ConcurrentHashMap<>();
- this.lastPingTimestamp = System.currentTimeMillis();
+ this.lastAccessTimestamp = System.currentTimeMillis();
}
public void putInputStream(TBrokerFD fd, FSDataInputStream
inputStream, BrokerFileSystem fileSystem) {
+ updateLastAccessTime();
inputStreams.putIfAbsent(fd, new BrokerInputStream(inputStream,
fileSystem));
}
public void putOutputStream(TBrokerFD fd, FSDataOutputStream
outputStream, BrokerFileSystem fileSystem) {
+ updateLastAccessTime();
outputStreams.putIfAbsent(fd, new BrokerOutputStream(outputStream,
fileSystem));
}
public FSDataInputStream getInputStream(TBrokerFD fd) {
+ updateLastAccessTime();
BrokerInputStream brokerInputStream = inputStreams.get(fd);
if (brokerInputStream != null) {
return brokerInputStream.getInputStream();
@@ -223,6 +227,7 @@ public class ClientContextManager {
}
public FSDataOutputStream getOutputStream(TBrokerFD fd) {
+ updateLastAccessTime();
BrokerOutputStream brokerOutputStream = outputStreams.get(fd);
if (brokerOutputStream != null) {
return brokerOutputStream.getOutputStream();
@@ -230,8 +235,12 @@ public class ClientContextManager {
return null;
}
+ public void updateLastAccessTime() {
+ this.lastAccessTimestamp = System.currentTimeMillis();
+ }
+
public void updateLastPingTime() {
- this.lastPingTimestamp = System.currentTimeMillis();
+ this.lastAccessTimestamp = System.currentTimeMillis();
// Should we also update the underline filesystem? maybe it is
time cost
for (BrokerInputStream brokerInputStream : inputStreams.values()) {
brokerInputStream.updateLastUpdateAccessTime();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]