shanthoosh commented on code in PR #1717:
URL: https://github.com/apache/samza/pull/1717#discussion_r2205617046
##########
samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java:
##########
@@ -44,20 +47,71 @@ public class PosixCommandBasedStatisticsGetter implements
SystemStatisticsGetter
private List<String> getAllCommandOutput(String[] cmdArray) throws
IOException {
log.debug("Executing commands {}", Arrays.toString(cmdArray));
Process executable = Runtime.getRuntime().exec(cmdArray);
- BufferedReader processReader;
List<String> psOutput = new ArrayList<>();
- processReader = new BufferedReader(new
InputStreamReader(executable.getInputStream()));
- String line;
- while ((line = processReader.readLine()) != null) {
- if (!line.isEmpty()) {
- psOutput.add(line);
+ try (BufferedReader processReader = new BufferedReader(new
InputStreamReader(executable.getInputStream()));
+ BufferedReader errorReader = new BufferedReader(new
InputStreamReader(executable.getErrorStream()))) {
+
+ // Read output stream
+ String line;
+ while ((line = processReader.readLine()) != null) {
+ if (!line.isEmpty()) {
+ psOutput.add(line);
+ }
+ }
+
+ // Consume error stream to prevent blocking
+ consumeErrorStream(errorReader, cmdArray);
+
+ // Wait for the process to complete to prevent resource leak
+ try {
+ boolean finished = executable.waitFor(COMMAND_TIMEOUT_SECONDS,
TimeUnit.SECONDS);
Review Comment:
Do we want to make this configurable per job.
##########
samza-core/src/main/java/org/apache/samza/container/host/PosixCommandBasedStatisticsGetter.java:
##########
@@ -44,20 +47,71 @@ public class PosixCommandBasedStatisticsGetter implements
SystemStatisticsGetter
private List<String> getAllCommandOutput(String[] cmdArray) throws
IOException {
log.debug("Executing commands {}", Arrays.toString(cmdArray));
Process executable = Runtime.getRuntime().exec(cmdArray);
- BufferedReader processReader;
List<String> psOutput = new ArrayList<>();
- processReader = new BufferedReader(new
InputStreamReader(executable.getInputStream()));
- String line;
- while ((line = processReader.readLine()) != null) {
- if (!line.isEmpty()) {
- psOutput.add(line);
+ try (BufferedReader processReader = new BufferedReader(new
InputStreamReader(executable.getInputStream()));
+ BufferedReader errorReader = new BufferedReader(new
InputStreamReader(executable.getErrorStream()))) {
+
+ // Read output stream
+ String line;
+ while ((line = processReader.readLine()) != null) {
+ if (!line.isEmpty()) {
+ psOutput.add(line);
+ }
+ }
+
+ // Consume error stream to prevent blocking
+ consumeErrorStream(errorReader, cmdArray);
+
+ // Wait for the process to complete to prevent resource leak
+ try {
+ boolean finished = executable.waitFor(COMMAND_TIMEOUT_SECONDS,
TimeUnit.SECONDS);
Review Comment:
What will be the behavior if the executable did not complete within the
timeout?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]