This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new be67d61223 [ISSUE #9447] Avoiding possible resource leaks in
InputStream (#9459)
be67d61223 is described below
commit be67d6122357fd10316820cdf3b7d5a35cf9c73f
Author: Xu Chengxin <[email protected]>
AuthorDate: Mon Jun 30 14:31:04 2025 +0800
[ISSUE #9447] Avoiding possible resource leaks in InputStream (#9459)
---
.../org/apache/rocketmq/broker/BrokerStartup.java | 6 +-
.../rocketmq/container/BrokerContainerStartup.java | 7 +-
.../remoting/protocol/body/RegisterBrokerBody.java | 103 ++++++++++-----------
.../rocketmq/test/util/DuplicateMessageInfo.java | 18 ++--
4 files changed, 66 insertions(+), 68 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 90006b087e..32c79febc6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -294,10 +294,10 @@ public class BrokerStartup {
}
public Properties loadConfig() throws Exception {
- InputStream in = new
BufferedInputStream(Files.newInputStream(Paths.get(file)));
Properties properties = new Properties();
- properties.load(in);
- in.close();
+ try (InputStream in = new
BufferedInputStream(Files.newInputStream(Paths.get(file)))) {
+ properties.load(in);
+ }
return properties;
}
diff --git
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java
index f909e623b2..0a057a4246 100644
---
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java
+++
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java
@@ -405,10 +405,11 @@ public class BrokerContainerStartup {
}
public Properties loadConfig() throws Exception {
- InputStream in = new BufferedInputStream(new
FileInputStream(file));
Properties properties = new Properties();
- properties.load(in);
- in.close();
+
+ try (InputStream in = new BufferedInputStream(new
FileInputStream(file))) {
+ properties.load(in);
+ }
return properties;
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RegisterBrokerBody.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RegisterBrokerBody.java
index 99557b1d3f..7312b70233 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RegisterBrokerBody.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RegisterBrokerBody.java
@@ -54,11 +54,10 @@ public class RegisterBrokerBody extends
RemotingSerializable {
}
long start = System.currentTimeMillis();
ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
- DeflaterOutputStream outputStream = new
DeflaterOutputStream(byteArrayOutputStream, new
Deflater(Deflater.BEST_COMPRESSION));
- DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion();
- ConcurrentMap<String, TopicConfig> topicConfigTable =
cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable());
- assert topicConfigTable != null;
- try {
+ try (DeflaterOutputStream outputStream = new
DeflaterOutputStream(byteArrayOutputStream, new
Deflater(Deflater.BEST_COMPRESSION))) {
+ DataVersion dataVersion =
topicConfigSerializeWrapper.getDataVersion();
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable());
+ assert topicConfigTable != null;
byte[] buffer = dataVersion.encode();
// write data version
@@ -117,58 +116,56 @@ public class RegisterBrokerBody extends
RemotingSerializable {
return RegisterBrokerBody.decode(data, RegisterBrokerBody.class);
}
long start = System.currentTimeMillis();
- InflaterInputStream inflaterInputStream = new InflaterInputStream(new
ByteArrayInputStream(data));
- int dataVersionLength = readInt(inflaterInputStream);
- byte[] dataVersionBytes = readBytes(inflaterInputStream,
dataVersionLength);
- DataVersion dataVersion = DataVersion.decode(dataVersionBytes,
DataVersion.class);
-
- RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
-
registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion);
- ConcurrentMap<String, TopicConfig> topicConfigTable =
registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable();
-
- int topicConfigNumber = readInt(inflaterInputStream);
- LOGGER.debug("{} topic configs to extract", topicConfigNumber);
-
- for (int i = 0; i < topicConfigNumber; i++) {
- int topicConfigJsonLength = readInt(inflaterInputStream);
-
- byte[] buffer = readBytes(inflaterInputStream,
topicConfigJsonLength);
- TopicConfig topicConfig = new TopicConfig();
- String topicConfigJson = new String(buffer,
MixAll.DEFAULT_CHARSET);
- topicConfig.decode(topicConfigJson);
- topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
- }
-
- int filterServerListJsonLength = readInt(inflaterInputStream);
-
- byte[] filterServerListBuffer = readBytes(inflaterInputStream,
filterServerListJsonLength);
- String filterServerListJson = new String(filterServerListBuffer,
MixAll.DEFAULT_CHARSET);
- List<String> filterServerList = new ArrayList<>();
- try {
- filterServerList = JSON.parseArray(filterServerListJson,
String.class);
- } catch (Exception e) {
- LOGGER.error("Decompressing occur Exception {}",
filterServerListJson);
- }
-
- registerBrokerBody.setFilterServerList(filterServerList);
+ try (InflaterInputStream inflaterInputStream = new
InflaterInputStream(new ByteArrayInputStream(data))) {
+ int dataVersionLength = readInt(inflaterInputStream);
+ byte[] dataVersionBytes = readBytes(inflaterInputStream,
dataVersionLength);
+ DataVersion dataVersion = DataVersion.decode(dataVersionBytes,
DataVersion.class);
+
+ RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+
registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion);
+ ConcurrentMap<String, TopicConfig> topicConfigTable =
registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable();
+
+ int topicConfigNumber = readInt(inflaterInputStream);
+ LOGGER.debug("{} topic configs to extract", topicConfigNumber);
+
+ for (int i = 0; i < topicConfigNumber; i++) {
+ int topicConfigJsonLength = readInt(inflaterInputStream);
+ byte[] buffer = readBytes(inflaterInputStream,
topicConfigJsonLength);
+ TopicConfig topicConfig = new TopicConfig();
+ String topicConfigJson = new String(buffer,
MixAll.DEFAULT_CHARSET);
+ topicConfig.decode(topicConfigJson);
+ topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
- if (brokerVersion.ordinal() >= MQVersion.Version.V5_0_0.ordinal()) {
- int topicQueueMappingNum = readInt(inflaterInputStream);
- Map<String/* topic */, TopicQueueMappingInfo>
topicQueueMappingInfoMap = new ConcurrentHashMap<>();
- for (int i = 0; i < topicQueueMappingNum; i++) {
- int mappingJsonLen = readInt(inflaterInputStream);
- byte[] buffer = readBytes(inflaterInputStream, mappingJsonLen);
- TopicQueueMappingInfo info =
TopicQueueMappingInfo.decode(buffer, TopicQueueMappingInfo.class);
- topicQueueMappingInfoMap.put(info.getTopic(), info);
+ int filterServerListJsonLength = readInt(inflaterInputStream);
+ byte[] filterServerListBuffer = readBytes(inflaterInputStream,
filterServerListJsonLength);
+ String filterServerListJson = new String(filterServerListBuffer,
MixAll.DEFAULT_CHARSET);
+ List<String> filterServerList = new ArrayList<>();
+ try {
+ filterServerList = JSON.parseArray(filterServerListJson,
String.class);
+ } catch (Exception e) {
+ LOGGER.error("Decompressing occur Exception {}",
filterServerListJson, e);
+ }
+ registerBrokerBody.setFilterServerList(filterServerList);
+
+ if (brokerVersion.ordinal() >= MQVersion.Version.V5_0_0.ordinal())
{
+ int topicQueueMappingNum = readInt(inflaterInputStream);
+ Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap =
new ConcurrentHashMap<>();
+ for (int i = 0; i < topicQueueMappingNum; i++) {
+ int mappingJsonLen = readInt(inflaterInputStream);
+ byte[] buffer = readBytes(inflaterInputStream,
mappingJsonLen);
+ TopicQueueMappingInfo info =
TopicQueueMappingInfo.decode(buffer, TopicQueueMappingInfo.class);
+ topicQueueMappingInfoMap.put(info.getTopic(), info);
+ }
+
registerBrokerBody.getTopicConfigSerializeWrapper().setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
}
-
registerBrokerBody.getTopicConfigSerializeWrapper().setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
- }
- long takeTime = System.currentTimeMillis() - start;
- if (takeTime > MINIMUM_TAKE_TIME_MILLISECOND) {
- LOGGER.info("Decompressing takes {}ms", takeTime);
+ long takeTime = System.currentTimeMillis() - start;
+ if (takeTime > MINIMUM_TAKE_TIME_MILLISECOND) {
+ LOGGER.info("Decompressing takes {}ms", takeTime);
+ }
+ return registerBrokerBody;
}
- return registerBrokerBody;
}
private static byte[] convertIntToByteArray(int n) {
diff --git
a/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java
b/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java
index 8aa2840312..8073e00902 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java
@@ -101,17 +101,17 @@ public class DuplicateMessageInfo<T> {
if (bPrintLog) {
String logFileNameStr = "D:" + File.separator +
"checkDuplicatedMessageInfo.txt";
File logFileNameFile = new File(logFileNameStr);
- OutputStream out = new FileOutputStream(logFileNameFile, true);
+ try (OutputStream out = new FileOutputStream(logFileNameFile,
true)) {
- String strToWrite;
- byte[] byteToWrite;
- strToWrite = strBuilder + titleString;
- for (int i = 0; i < msgListSize; i++)
- strToWrite += strBQueue.get(i).toString() + "\r\n";
+ String strToWrite;
+ byte[] byteToWrite;
+ strToWrite = strBuilder + titleString;
+ for (int i = 0; i < msgListSize; i++)
+ strToWrite += strBQueue.get(i).toString() + "\r\n";
- byteToWrite = strToWrite.getBytes(StandardCharsets.UTF_8);
- out.write(byteToWrite);
- out.close();
+ byteToWrite = strToWrite.getBytes(StandardCharsets.UTF_8);
+ out.write(byteToWrite);
+ }
}
}