This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new be67d61223 [ISSUE #9447] Avoiding possible resource leaks in 
InputStream (#9459)
be67d61223 is described below

commit be67d6122357fd10316820cdf3b7d5a35cf9c73f
Author: Xu Chengxin <[email protected]>
AuthorDate: Mon Jun 30 14:31:04 2025 +0800

    [ISSUE #9447] Avoiding possible resource leaks in InputStream (#9459)
---
 .../org/apache/rocketmq/broker/BrokerStartup.java  |   6 +-
 .../rocketmq/container/BrokerContainerStartup.java |   7 +-
 .../remoting/protocol/body/RegisterBrokerBody.java | 103 ++++++++++-----------
 .../rocketmq/test/util/DuplicateMessageInfo.java   |  18 ++--
 4 files changed, 66 insertions(+), 68 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 90006b087e..32c79febc6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -294,10 +294,10 @@ public class BrokerStartup {
         }
 
         public Properties loadConfig() throws Exception {
-            InputStream in = new 
BufferedInputStream(Files.newInputStream(Paths.get(file)));
             Properties properties = new Properties();
-            properties.load(in);
-            in.close();
+            try (InputStream in = new 
BufferedInputStream(Files.newInputStream(Paths.get(file)))) {
+                properties.load(in);
+            }
             return properties;
         }
 
diff --git 
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java
 
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java
index f909e623b2..0a057a4246 100644
--- 
a/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java
+++ 
b/container/src/main/java/org/apache/rocketmq/container/BrokerContainerStartup.java
@@ -405,10 +405,11 @@ public class BrokerContainerStartup {
         }
 
         public Properties loadConfig() throws Exception {
-            InputStream in = new BufferedInputStream(new 
FileInputStream(file));
             Properties properties = new Properties();
-            properties.load(in);
-            in.close();
+
+            try (InputStream in = new BufferedInputStream(new 
FileInputStream(file))) {
+                properties.load(in);
+            }
             return properties;
         }
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RegisterBrokerBody.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RegisterBrokerBody.java
index 99557b1d3f..7312b70233 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RegisterBrokerBody.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/RegisterBrokerBody.java
@@ -54,11 +54,10 @@ public class RegisterBrokerBody extends 
RemotingSerializable {
         }
         long start = System.currentTimeMillis();
         ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
-        DeflaterOutputStream outputStream = new 
DeflaterOutputStream(byteArrayOutputStream, new 
Deflater(Deflater.BEST_COMPRESSION));
-        DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion();
-        ConcurrentMap<String, TopicConfig> topicConfigTable = 
cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable());
-        assert topicConfigTable != null;
-        try {
+        try (DeflaterOutputStream outputStream = new 
DeflaterOutputStream(byteArrayOutputStream, new 
Deflater(Deflater.BEST_COMPRESSION))) {
+            DataVersion dataVersion = 
topicConfigSerializeWrapper.getDataVersion();
+            ConcurrentMap<String, TopicConfig> topicConfigTable = 
cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable());
+            assert topicConfigTable != null;
             byte[] buffer = dataVersion.encode();
 
             // write data version
@@ -117,58 +116,56 @@ public class RegisterBrokerBody extends 
RemotingSerializable {
             return RegisterBrokerBody.decode(data, RegisterBrokerBody.class);
         }
         long start = System.currentTimeMillis();
-        InflaterInputStream inflaterInputStream = new InflaterInputStream(new 
ByteArrayInputStream(data));
-        int dataVersionLength = readInt(inflaterInputStream);
-        byte[] dataVersionBytes = readBytes(inflaterInputStream, 
dataVersionLength);
-        DataVersion dataVersion = DataVersion.decode(dataVersionBytes, 
DataVersion.class);
-
-        RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
-        
registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion);
-        ConcurrentMap<String, TopicConfig> topicConfigTable = 
registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable();
-
-        int topicConfigNumber = readInt(inflaterInputStream);
-        LOGGER.debug("{} topic configs to extract", topicConfigNumber);
-
-        for (int i = 0; i < topicConfigNumber; i++) {
-            int topicConfigJsonLength = readInt(inflaterInputStream);
-
-            byte[] buffer = readBytes(inflaterInputStream, 
topicConfigJsonLength);
-            TopicConfig topicConfig = new TopicConfig();
-            String topicConfigJson = new String(buffer, 
MixAll.DEFAULT_CHARSET);
-            topicConfig.decode(topicConfigJson);
-            topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
-        }
-
-        int filterServerListJsonLength = readInt(inflaterInputStream);
-
-        byte[] filterServerListBuffer = readBytes(inflaterInputStream, 
filterServerListJsonLength);
-        String filterServerListJson = new String(filterServerListBuffer, 
MixAll.DEFAULT_CHARSET);
-        List<String> filterServerList = new ArrayList<>();
-        try {
-            filterServerList = JSON.parseArray(filterServerListJson, 
String.class);
-        } catch (Exception e) {
-            LOGGER.error("Decompressing occur Exception {}", 
filterServerListJson);
-        }
-
-        registerBrokerBody.setFilterServerList(filterServerList);
+        try (InflaterInputStream inflaterInputStream = new 
InflaterInputStream(new ByteArrayInputStream(data))) {
+            int dataVersionLength = readInt(inflaterInputStream);
+            byte[] dataVersionBytes = readBytes(inflaterInputStream, 
dataVersionLength);
+            DataVersion dataVersion = DataVersion.decode(dataVersionBytes, 
DataVersion.class);
+
+            RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
+            
registerBrokerBody.getTopicConfigSerializeWrapper().setDataVersion(dataVersion);
+            ConcurrentMap<String, TopicConfig> topicConfigTable = 
registerBrokerBody.getTopicConfigSerializeWrapper().getTopicConfigTable();
+
+            int topicConfigNumber = readInt(inflaterInputStream);
+            LOGGER.debug("{} topic configs to extract", topicConfigNumber);
+
+            for (int i = 0; i < topicConfigNumber; i++) {
+                int topicConfigJsonLength = readInt(inflaterInputStream);
+                byte[] buffer = readBytes(inflaterInputStream, 
topicConfigJsonLength);
+                TopicConfig topicConfig = new TopicConfig();
+                String topicConfigJson = new String(buffer, 
MixAll.DEFAULT_CHARSET);
+                topicConfig.decode(topicConfigJson);
+                topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+            }
 
-        if (brokerVersion.ordinal() >= MQVersion.Version.V5_0_0.ordinal()) {
-            int topicQueueMappingNum = readInt(inflaterInputStream);
-            Map<String/* topic */, TopicQueueMappingInfo> 
topicQueueMappingInfoMap = new ConcurrentHashMap<>();
-            for (int i = 0; i < topicQueueMappingNum; i++) {
-                int mappingJsonLen = readInt(inflaterInputStream);
-                byte[] buffer = readBytes(inflaterInputStream, mappingJsonLen);
-                TopicQueueMappingInfo info = 
TopicQueueMappingInfo.decode(buffer, TopicQueueMappingInfo.class);
-                topicQueueMappingInfoMap.put(info.getTopic(), info);
+            int filterServerListJsonLength = readInt(inflaterInputStream);
+            byte[] filterServerListBuffer = readBytes(inflaterInputStream, 
filterServerListJsonLength);
+            String filterServerListJson = new String(filterServerListBuffer, 
MixAll.DEFAULT_CHARSET);
+            List<String> filterServerList = new ArrayList<>();
+            try {
+                filterServerList = JSON.parseArray(filterServerListJson, 
String.class);
+            } catch (Exception e) {
+                LOGGER.error("Decompressing occur Exception {}", 
filterServerListJson, e);
+            }
+            registerBrokerBody.setFilterServerList(filterServerList);
+
+            if (brokerVersion.ordinal() >= MQVersion.Version.V5_0_0.ordinal()) 
{
+                int topicQueueMappingNum = readInt(inflaterInputStream);
+                Map<String, TopicQueueMappingInfo> topicQueueMappingInfoMap = 
new ConcurrentHashMap<>();
+                for (int i = 0; i < topicQueueMappingNum; i++) {
+                    int mappingJsonLen = readInt(inflaterInputStream);
+                    byte[] buffer = readBytes(inflaterInputStream, 
mappingJsonLen);
+                    TopicQueueMappingInfo info = 
TopicQueueMappingInfo.decode(buffer, TopicQueueMappingInfo.class);
+                    topicQueueMappingInfoMap.put(info.getTopic(), info);
+                }
+                
registerBrokerBody.getTopicConfigSerializeWrapper().setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
             }
-            
registerBrokerBody.getTopicConfigSerializeWrapper().setTopicQueueMappingInfoMap(topicQueueMappingInfoMap);
-        }
 
-        long takeTime = System.currentTimeMillis() - start;
-        if (takeTime > MINIMUM_TAKE_TIME_MILLISECOND) {
-            LOGGER.info("Decompressing takes {}ms", takeTime);
+            long takeTime = System.currentTimeMillis() - start;
+            if (takeTime > MINIMUM_TAKE_TIME_MILLISECOND) {
+                LOGGER.info("Decompressing takes {}ms", takeTime);
+            }
+            return registerBrokerBody;
         }
-        return registerBrokerBody;
     }
 
     private static byte[] convertIntToByteArray(int n) {
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java 
b/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java
index 8aa2840312..8073e00902 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/DuplicateMessageInfo.java
@@ -101,17 +101,17 @@ public class DuplicateMessageInfo<T> {
         if (bPrintLog) {
             String logFileNameStr = "D:" + File.separator + 
"checkDuplicatedMessageInfo.txt";
             File logFileNameFile = new File(logFileNameStr);
-            OutputStream out = new FileOutputStream(logFileNameFile, true);
+            try (OutputStream out = new FileOutputStream(logFileNameFile, 
true)) {
 
-            String strToWrite;
-            byte[] byteToWrite;
-            strToWrite = strBuilder + titleString;
-            for (int i = 0; i < msgListSize; i++)
-                strToWrite += strBQueue.get(i).toString() + "\r\n";
+                String strToWrite;
+                byte[] byteToWrite;
+                strToWrite = strBuilder + titleString;
+                for (int i = 0; i < msgListSize; i++)
+                    strToWrite += strBQueue.get(i).toString() + "\r\n";
 
-            byteToWrite = strToWrite.getBytes(StandardCharsets.UTF_8);
-            out.write(byteToWrite);
-            out.close();
+                byteToWrite = strToWrite.getBytes(StandardCharsets.UTF_8);
+                out.write(byteToWrite);
+            }
         }
     }
 

Reply via email to