This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bfb7ab8c42 Fix resource leak in KafkaIO GCS truststore file download 
(#37681)
9bfb7ab8c42 is described below

commit 9bfb7ab8c42ede75309e117d19d2a729ea550855
Author: ZIHAN DAI <[email protected]>
AuthorDate: Tue Feb 24 02:48:14 2026 +1100

    Fix resource leak in KafkaIO GCS truststore file download (#37681)
    
    Convert manual resource close to try-with-resources in
    identityOrGcsToLocalFile to prevent leaking ReadableByteChannel,
    FileOutputStream, and WritableByteChannel when an IOException
    occurs during the copy loop. Also preserve the original IOException
    as the cause of the IllegalArgumentException.
---
 .../io/kafka/KafkaReadSchemaTransformProvider.java | 31 +++++++++-------------
 1 file changed, 12 insertions(+), 19 deletions(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index 74f9b147bbd..c5764b39bc6 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -402,31 +402,24 @@ public class KafkaReadSchemaTransformProvider
             LOG.info(
                 "Downloading {} into local filesystem ({})", configStr, 
localFile.toAbsolutePath());
             // TODO(pabloem): Only copy if file does not exist.
-            ReadableByteChannel channel =
-                
FileSystems.open(FileSystems.match(configStr).metadata().get(0).resourceId());
-            FileOutputStream outputStream = new 
FileOutputStream(localFile.toFile());
-
-            // Create a WritableByteChannel to write data to the 
FileOutputStream
-            WritableByteChannel outputChannel = 
Channels.newChannel(outputStream);
-
-            // Read data from the ReadableByteChannel and write it to the 
WritableByteChannel
-            ByteBuffer buffer = ByteBuffer.allocate(1024);
-            while (channel.read(buffer) != -1) {
-              buffer.flip();
-              outputChannel.write(buffer);
-              buffer.compact();
+            try (ReadableByteChannel channel =
+                    
FileSystems.open(FileSystems.match(configStr).metadata().get(0).resourceId());
+                FileOutputStream outputStream = new 
FileOutputStream(localFile.toFile());
+                WritableByteChannel outputChannel = 
Channels.newChannel(outputStream)) {
+              ByteBuffer buffer = ByteBuffer.allocate(1024);
+              while (channel.read(buffer) != -1) {
+                buffer.flip();
+                outputChannel.write(buffer);
+                buffer.compact();
+              }
             }
-
-            // Close the channels and the output stream
-            channel.close();
-            outputChannel.close();
-            outputStream.close();
             return localFile.toAbsolutePath().toString();
           } catch (IOException e) {
             throw new IllegalArgumentException(
                 String.format(
                     "Unable to fetch file %s to be used locally to create a 
Kafka Consumer.",
-                    configStr));
+                    configStr),
+                e);
           }
         } else {
           return configValue;

Reply via email to