This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 782bf654ee4c fix(common): Close log writer output stream on append 
failure (#18909)
782bf654ee4c is described below

commit 782bf654ee4cac9da7e1244da03d83b298bbbe3a
Author: fhan <[email protected]>
AuthorDate: Tue Jun 9 12:05:09 2026 +0800

    fix(common): Close log writer output stream on append failure (#18909)
    
    * fix(common): Close log writer output stream on append failure
    * simplify the exception handling
    
    ---------
    
    Co-authored-by: fhan <[email protected]>
    Co-authored-by: danny0405 <[email protected]>
---
 .../org/apache/hudi/exception/ExceptionUtil.java   |  18 +-
 .../apache/hudi/exception/TestExceptionUtil.java   |  20 +++
 .../common/table/log/HoodieLogFormatWriter.java    | 184 +++++++++++++--------
 .../table/log/TestHoodieLogFormatWriter.java       | 180 ++++++++++++++++++++
 4 files changed, 333 insertions(+), 69 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/exception/ExceptionUtil.java 
b/hudi-common/src/main/java/org/apache/hudi/exception/ExceptionUtil.java
index 92e2e3cc5356..40a39542c7a7 100644
--- a/hudi-common/src/main/java/org/apache/hudi/exception/ExceptionUtil.java
+++ b/hudi-common/src/main/java/org/apache/hudi/exception/ExceptionUtil.java
@@ -23,6 +23,8 @@ import org.apache.hudi.common.util.StringUtils;
 
 import javax.annotation.Nonnull;
 
+import java.io.IOException;
+
 /**
  * Util class for exception analysis.
  */
@@ -31,7 +33,7 @@ public final class ExceptionUtil {
   }
 
   /**
-   * Returns true if error message is contained in any nested exception of 
provided {@link Throwable}.
+   * Returns true if error message is contained in any nested exception to 
provided {@link Throwable}.
    */
   public static boolean validateErrorMsg(@Nonnull Throwable t, String 
errorMsg) {
     if (StringUtils.isNullOrEmpty(errorMsg)) {
@@ -48,4 +50,18 @@ public final class ExceptionUtil {
 
     return false;
   }
+
+  /**
+   * Throws the provided exception as-is when it is an {@link IOException} or
+   * {@link RuntimeException}, otherwise wraps it in an {@link IOException}.
+   */
+  public static void throwAsIOExceptionOrRuntimeException(Throwable exception) 
throws IOException {
+    if (exception instanceof IOException) {
+      throw (IOException) exception;
+    }
+    if (exception instanceof RuntimeException) {
+      throw (RuntimeException) exception;
+    }
+    throw new IOException(exception);
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/exception/TestExceptionUtil.java 
b/hudi-common/src/test/java/org/apache/hudi/exception/TestExceptionUtil.java
index dba850b1b907..c2244896d855 100644
--- a/hudi-common/src/test/java/org/apache/hudi/exception/TestExceptionUtil.java
+++ b/hudi-common/src/test/java/org/apache/hudi/exception/TestExceptionUtil.java
@@ -23,8 +23,11 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 
+import static 
org.apache.hudi.exception.ExceptionUtil.throwAsIOExceptionOrRuntimeException;
 import static org.apache.hudi.exception.ExceptionUtil.validateErrorMsg;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class TestExceptionUtil {
@@ -54,4 +57,21 @@ class TestExceptionUtil {
     // Empty string should not be found in any message (including null)
     assertFalse(validateErrorMsg(exceptionWithoutMessage, ""));
   }
+
+  @Test
+  void testThrowAsIOExceptionOrRuntimeException() {
+    IOException ioException = new IOException("io");
+    IOException thrownIOException = assertThrows(IOException.class, () -> 
throwAsIOExceptionOrRuntimeException(ioException));
+    assertSame(ioException, thrownIOException);
+
+    RuntimeException runtimeException = new RuntimeException("runtime");
+    RuntimeException thrownRuntimeException =
+        assertThrows(RuntimeException.class, () -> 
throwAsIOExceptionOrRuntimeException(runtimeException));
+    assertSame(runtimeException, thrownRuntimeException);
+
+    Exception checkedException = new Exception("checked");
+    IOException wrappedException =
+        assertThrows(IOException.class, () -> 
throwAsIOExceptionOrRuntimeException(checkedException));
+    assertSame(checkedException, wrappedException.getCause());
+  }
 }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index 2aed1d7dd87a..fe24bd600f1e 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -22,6 +22,7 @@ package org.apache.hudi.common.table.log;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.exception.ExceptionUtil;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
@@ -125,68 +126,73 @@ public class HoodieLogFormatWriter extends 
HoodieLogFormat.Writer {
 
   @Override
   public AppendResult appendBlocks(List<HoodieLogBlock> blocks) throws 
IOException {
-    // Find current version
-    HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
-        new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION);
-
-    FSDataOutputStream originalOutputStream = getOutputStream();
-    long startPos = originalOutputStream.getPos();
-    long sizeWritten = 0;
-    // HUDI-2655. here we wrap originalOutputStream to ensure huge blocks can 
be correctly written
-    FSDataOutputStream outputStream = new 
FSDataOutputStream(originalOutputStream, new 
FileSystem.Statistics(storage.getScheme()), startPos);
-    for (HoodieLogBlock block: blocks) {
-      long startSize = outputStream.size();
-
-      // 1. Write the magic header for the start of the block
-      outputStream.write(HoodieLogFormat.MAGIC);
-
-      // bytes for header
-      byte[] headerBytes = 
HoodieLogBlock.getHeaderMetadataBytes(block.getLogBlockHeader());
-      // content bytes
-      ByteArrayOutputStream content = block.getContentBytes(storage);
-      // bytes for footer
-      byte[] footerBytes = 
HoodieLogBlock.getFooterMetadataBytes(block.getLogBlockFooter());
-
-      // 2. Write the total size of the block (excluding Magic)
-      outputStream.writeLong(getLogBlockLength(content.size(), 
headerBytes.length, footerBytes.length));
-
-      // 3. Write the version of this log block
-      outputStream.writeInt(currentLogFormatVersion.getVersion());
-      // 4. Write the block type
-      outputStream.writeInt(block.getBlockType().ordinal());
-
-      // 5. Write the headers for the log block
-      outputStream.write(headerBytes);
-      // 6. Write the size of the content block
-      outputStream.writeLong(content.size());
-      // 7. Write the contents of the data block
-      content.writeTo(outputStream);
-      // 8. Write the footers for the log block
-      outputStream.write(footerBytes);
-      // 9. Write the total size of the log block (including magic) which is 
everything written
-      // until now (for reverse pointer)
-      // Update: this information is now used in determining if a block is 
corrupt by comparing to the
-      //   block size in header. This change assumes that the block size will 
be the last data written
-      //   to a block. Read will break if any data is written past this point 
for a block.
-      outputStream.writeLong(outputStream.size() - startSize);
-
-      // Fetch the size again, so it accounts also (9).
-
-      // HUDI-2655. Check the size written to avoid log blocks whose size 
overflow.
-      if (outputStream.size() == Integer.MAX_VALUE) {
-        throw new HoodieIOException("Blocks appended may overflow. Please 
decrease log block size or log block amount");
+    try {
+      // Find current version
+      HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
+          new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION);
+
+      FSDataOutputStream originalOutputStream = getOutputStream();
+      long startPos = originalOutputStream.getPos();
+      long sizeWritten = 0;
+      // HUDI-2655. here we wrap originalOutputStream to ensure huge blocks 
can be correctly written
+      FSDataOutputStream outputStream = new 
FSDataOutputStream(originalOutputStream, new 
FileSystem.Statistics(storage.getScheme()), startPos);
+      for (HoodieLogBlock block: blocks) {
+        long startSize = outputStream.size();
+
+        // 1. Write the magic header for the start of the block
+        outputStream.write(HoodieLogFormat.MAGIC);
+
+        // bytes for header
+        byte[] headerBytes = 
HoodieLogBlock.getHeaderMetadataBytes(block.getLogBlockHeader());
+        // content bytes
+        ByteArrayOutputStream content = block.getContentBytes(storage);
+        // bytes for footer
+        byte[] footerBytes = 
HoodieLogBlock.getFooterMetadataBytes(block.getLogBlockFooter());
+
+        // 2. Write the total size of the block (excluding Magic)
+        outputStream.writeLong(getLogBlockLength(content.size(), 
headerBytes.length, footerBytes.length));
+
+        // 3. Write the version of this log block
+        outputStream.writeInt(currentLogFormatVersion.getVersion());
+        // 4. Write the block type
+        outputStream.writeInt(block.getBlockType().ordinal());
+
+        // 5. Write the headers for the log block
+        outputStream.write(headerBytes);
+        // 6. Write the size of the content block
+        outputStream.writeLong(content.size());
+        // 7. Write the contents of the data block
+        content.writeTo(outputStream);
+        // 8. Write the footers for the log block
+        outputStream.write(footerBytes);
+        // 9. Write the total size of the log block (including magic) which is 
everything written
+        // until now (for reverse pointer)
+        // Update: this information is now used in determining if a block is 
corrupt by comparing to the
+        //   block size in header. This change assumes that the block size 
will be the last data written
+        //   to a block. Read will break if any data is written past this 
point for a block.
+        outputStream.writeLong(outputStream.size() - startSize);
+
+        // Fetch the size again, so it accounts also (9).
+
+        // HUDI-2655. Check the size written to avoid log blocks whose size 
overflow.
+        if (outputStream.size() == Integer.MAX_VALUE) {
+          throw new HoodieIOException("Blocks appended may overflow. Please 
decrease log block size or log block amount");
+        }
+        sizeWritten +=  outputStream.size() - startSize;
       }
-      sizeWritten +=  outputStream.size() - startSize;
+      // No flush/hsync here: append-time visibility is not part of the 
contract.
+      // Downstream readers only need commit-level visibility, which is 
provided
+      // when the writer is closed (see closeStream) or when callers explicitly
+      // invoke sync().
+
+      AppendResult result = new AppendResult(logFile, startPos, sizeWritten);
+      // roll over if size is past the threshold
+      rolloverIfNeeded();
+      return result;
+    } catch (IOException | RuntimeException e) {
+      closeOutputStreamOnAppendFailure(e);
+      throw e;
     }
-    // No flush/hsync here: append-time visibility is not part of the contract.
-    // Downstream readers only need commit-level visibility, which is provided
-    // when the writer is closed (see closeStream) or when callers explicitly
-    // invoke sync().
-
-    AppendResult result = new AppendResult(logFile, startPos, sizeWritten);
-    // roll over if size is past the threshold
-    rolloverIfNeeded();
-    return result;
   }
 
   /**
@@ -237,21 +243,63 @@ public class HoodieLogFormatWriter extends 
HoodieLogFormat.Writer {
 
   @Override
   public void close() throws IOException {
-    closeStream();
-    // remove the shutdown hook after closing the stream to avoid memory leaks
-    if (null != shutdownThread) {
-      Runtime.getRuntime().removeShutdownHook(shutdownThread);
+    try {
+      closeStream();
+    } finally {
+      // remove the shutdown hook after closing the stream to avoid memory 
leaks
+      if (null != shutdownThread) {
+        Runtime.getRuntime().removeShutdownHook(shutdownThread);
+        shutdownThread = null;
+      }
     }
   }
 
   private void closeStream() throws IOException {
-    if (outputStream != null) {
+    if (outputStream == null) {
+      return;
+    }
+
+    Throwable failure = null;
+    try {
       // Persist all buffered data to DataNodes before closing so downstream
       // readers can observe a fully-written log file at commit-level 
visibility.
       sync();
-      outputStream.close();
-      outputStream = null;
-      closed = true;
+    } catch (IOException | RuntimeException e) {
+      failure = e;
+    }
+
+    try {
+      closeOutputStream();
+    } catch (IOException | RuntimeException closeException) {
+      if (failure != null) {
+        failure.addSuppressed(closeException);
+      } else {
+        failure = closeException;
+      }
+    }
+
+    if (failure != null) {
+      ExceptionUtil.throwAsIOExceptionOrRuntimeException(failure);
+    }
+  }
+
+  private void closeOutputStreamOnAppendFailure(Throwable failure) {
+    try {
+      closeOutputStream();
+    } catch (IOException | RuntimeException closeException) {
+      failure.addSuppressed(closeException);
+      log.warn("Failed to close output stream after append failure for log 
file {}", logFile, closeException);
+    }
+  }
+
+  private void closeOutputStream() throws IOException {
+    if (outputStream != null) {
+      try {
+        outputStream.close();
+      } finally {
+        outputStream = null;
+        closed = true;
+      }
     }
   }
 
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormatWriter.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormatWriter.java
new file mode 100644
index 000000000000..15316012a97a
--- /dev/null
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormatWriter.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestHoodieLogFormatWriter {
+
+  private static final String WRITE_FAIL = "write-fail";
+  private static final String CLOSE_FAIL = "close-fail";
+  private static final String SYNC_FAIL = "sync-fail";
+
+  @TempDir
+  java.nio.file.Path tempDir;
+
+  @Test
+  void testCloseOutputOnAppendWriteException() throws IOException {
+    HoodieStorage storage = HoodieTestUtils.getStorage(tempDir.toString());
+    HoodieLogFormatWriter writer = newWriter(storage);
+    try {
+      CloseTrackingOutputStream outputStream = new 
CloseTrackingOutputStream(true, false);
+      writer.withOutputStream(newFSDataOutputStream(outputStream, storage));
+
+      IOException exception = assertThrows(IOException.class, () -> 
writer.appendBlock(commandBlock()));
+
+      assertEquals(WRITE_FAIL, exception.getMessage());
+      assertTrue(outputStream.isClosed());
+      assertThrows(IllegalStateException.class, writer::getCurrentSize);
+    } finally {
+      writer.close();
+    }
+  }
+
+  @Test
+  void testPreserveAppendExceptionWhenCloseFails() throws IOException {
+    HoodieStorage storage = HoodieTestUtils.getStorage(tempDir.toString());
+    HoodieLogFormatWriter writer = newWriter(storage);
+    try {
+      CloseTrackingOutputStream outputStream = new 
CloseTrackingOutputStream(true, true);
+      writer.withOutputStream(newFSDataOutputStream(outputStream, storage));
+
+      IOException exception = assertThrows(IOException.class, () -> 
writer.appendBlock(commandBlock()));
+
+      assertEquals(WRITE_FAIL, exception.getMessage());
+      assertTrue(outputStream.isClosed());
+      assertEquals(1, exception.getSuppressed().length);
+      assertEquals(CLOSE_FAIL, exception.getSuppressed()[0].getMessage());
+      assertThrows(IllegalStateException.class, writer::getCurrentSize);
+    } finally {
+      writer.close();
+    }
+  }
+
+  @Test
+  void testCloseOutputWhenSyncFailsOnClose() throws IOException {
+    HoodieStorage storage = HoodieTestUtils.getStorage(tempDir.toString());
+    HoodieLogFormatWriter writer = newWriter(storage);
+    try {
+      CloseTrackingOutputStream outputStream = new 
CloseTrackingOutputStream(false, false);
+      writer.withOutputStream(new SyncFailingFSDataOutputStream(outputStream, 
storage));
+
+      IOException exception = assertThrows(IOException.class, writer::close);
+
+      assertEquals(SYNC_FAIL, exception.getMessage());
+      assertTrue(outputStream.isClosed());
+      assertThrows(IllegalStateException.class, writer::getCurrentSize);
+    } finally {
+      writer.close();
+    }
+  }
+
+  private HoodieLogFormatWriter newWriter(HoodieStorage storage) throws 
IOException {
+    return HoodieLogFormatWriter.builder()
+        .withParentPath(new StoragePath(tempDir.toString()))
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+        .withLogFileId("test-fileid")
+        .withInstantTime("100")
+        .withLogVersion(1)
+        .withStorage(storage)
+        .build();
+  }
+
+  private HoodieCommandBlock commandBlock() {
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
+        
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
+    return new HoodieCommandBlock(header);
+  }
+
+  private FSDataOutputStream newFSDataOutputStream(CloseTrackingOutputStream 
outputStream, HoodieStorage storage)
+      throws IOException {
+    return new FSDataOutputStream(outputStream, new 
FileSystem.Statistics(storage.getScheme()));
+  }
+
+  private static class CloseTrackingOutputStream extends OutputStream {
+
+    private final boolean failOnWrite;
+    private final boolean failOnClose;
+    private boolean closed;
+
+    private CloseTrackingOutputStream(boolean failOnWrite, boolean 
failOnClose) {
+      this.failOnWrite = failOnWrite;
+      this.failOnClose = failOnClose;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      if (failOnWrite) {
+        throw new IOException(WRITE_FAIL);
+      }
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      if (failOnWrite) {
+        throw new IOException(WRITE_FAIL);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      closed = true;
+      if (failOnClose) {
+        throw new IOException(CLOSE_FAIL);
+      }
+    }
+
+    private boolean isClosed() {
+      return closed;
+    }
+  }
+
+  private static class SyncFailingFSDataOutputStream extends 
FSDataOutputStream {
+
+    private SyncFailingFSDataOutputStream(CloseTrackingOutputStream 
outputStream, HoodieStorage storage)
+        throws IOException {
+      super(outputStream, new FileSystem.Statistics(storage.getScheme()));
+    }
+
+    @Override
+    public void hsync() throws IOException {
+      throw new IOException(SYNC_FAIL);
+    }
+  }
+}

Reply via email to