This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 32c9cad  [HUDI-840] Avoid blank file created by HoodieLogFormatWriter 
(#1567)
32c9cad is described below

commit 32c9cad52c7eef1067f12867afc405a1624cef14
Author: hongdd <jn_...@163.com>
AuthorDate: Tue Sep 29 23:02:15 2020 +0800

    [HUDI-840] Avoid blank file created by HoodieLogFormatWriter (#1567)
---
 .../common/table/log/HoodieLogFormatWriter.java    | 121 ++++++++++++---------
 .../common/functional/TestHoodieLogFormat.java     |   9 +-
 .../TestHoodieLogFormatAppendFailure.java          |   6 +
 3 files changed, 83 insertions(+), 53 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index d9c23b6..8909477 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -54,6 +54,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
   private final String logWriteToken;
   private final String rolloverLogWriteToken;
   private FSDataOutputStream output;
+  private boolean closed = false;
   private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not 
sufficiently replicated yet";
 
   /**
@@ -64,7 +65,7 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
    * @param sizeThreshold
    */
   HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer 
bufferSize, Short replication, Long sizeThreshold,
-      String logWriteToken, String rolloverLogWriteToken) throws IOException, 
InterruptedException {
+      String logWriteToken, String rolloverLogWriteToken) {
     this.fs = fs;
     this.logFile = logFile;
     this.sizeThreshold = sizeThreshold;
@@ -73,40 +74,6 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
     this.logWriteToken = logWriteToken;
     this.rolloverLogWriteToken = rolloverLogWriteToken;
     addShutDownHook();
-    Path path = logFile.getPath();
-    if (fs.exists(path)) {
-      boolean isAppendSupported = 
StorageSchemes.isAppendSupported(fs.getScheme());
-      if (isAppendSupported) {
-        LOG.info(logFile + " exists. Appending to existing file");
-        try {
-          this.output = fs.append(path, bufferSize);
-        } catch (RemoteException e) {
-          LOG.warn("Remote Exception, attempting to handle or recover lease", 
e);
-          handleAppendExceptionOrRecoverLease(path, e);
-        } catch (IOException ioe) {
-          if (ioe.getMessage().toLowerCase().contains("not supported")) {
-            // may still happen if scheme is viewfs.
-            isAppendSupported = false;
-          } else {
-            /*
-             * Before throwing an exception, close the outputstream,
-             * to ensure that the lease on the log file is released.
-             */
-            close();
-            throw ioe;
-          }
-        }
-      }
-      if (!isAppendSupported) {
-        this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
-        LOG.info("Append not supported.. Rolling over to " + logFile);
-        createNewFile();
-      }
-    } else {
-      LOG.info(logFile + " does not exist. Create a new file");
-      // Block size does not matter as we will always manually autoflush
-      createNewFile();
-    }
   }
 
   public FileSystem getFs() {
@@ -122,16 +89,64 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
     return sizeThreshold;
   }
 
+  /**
+   * Lazily opens the output stream if needed for writing.
+   * @return OutputStream for writing to current log file.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private FSDataOutputStream getOutputStream() throws IOException, 
InterruptedException {
+    if (this.output == null) {
+      Path path = logFile.getPath();
+      if (fs.exists(path)) {
+        boolean isAppendSupported = 
StorageSchemes.isAppendSupported(fs.getScheme());
+        if (isAppendSupported) {
+          LOG.info(logFile + " exists. Appending to existing file");
+          try {
+            this.output = fs.append(path, bufferSize);
+          } catch (RemoteException e) {
+            LOG.warn("Remote Exception, attempting to handle or recover 
lease", e);
+            handleAppendExceptionOrRecoverLease(path, e);
+          } catch (IOException ioe) {
+            if (ioe.getMessage().toLowerCase().contains("not supported")) {
+              // may still happen if scheme is viewfs.
+              isAppendSupported = false;
+            } else {
+              /*
+               * Before throwing an exception, close the outputstream,
+               * to ensure that the lease on the log file is released.
+               */
+              close();
+              throw ioe;
+            }
+          }
+        }
+        if (!isAppendSupported) {
+          this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
+          LOG.info("Append not supported.. Rolling over to " + logFile);
+          createNewFile();
+        }
+      } else {
+        LOG.info(logFile + " does not exist. Create a new file");
+        // Block size does not matter as we will always manually autoflush
+        createNewFile();
+      }
+    }
+    return output;
+  }
+
   @Override
   public Writer appendBlock(HoodieLogBlock block) throws IOException, 
InterruptedException {
 
     // Find current version
     HoodieLogFormat.LogFormatVersion currentLogFormatVersion =
         new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION);
-    long currentSize = this.output.size();
+
+    FSDataOutputStream outputStream = getOutputStream();
+    long currentSize = outputStream.size();
 
     // 1. Write the magic header for the start of the block
-    this.output.write(HoodieLogFormat.MAGIC);
+    outputStream.write(HoodieLogFormat.MAGIC);
 
     // bytes for header
     byte[] headerBytes = 
HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader());
@@ -141,27 +156,27 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
     byte[] footerBytes = 
HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter());
 
     // 2. Write the total size of the block (excluding Magic)
-    this.output.writeLong(getLogBlockLength(content.length, 
headerBytes.length, footerBytes.length));
+    outputStream.writeLong(getLogBlockLength(content.length, 
headerBytes.length, footerBytes.length));
 
     // 3. Write the version of this log block
-    this.output.writeInt(currentLogFormatVersion.getVersion());
+    outputStream.writeInt(currentLogFormatVersion.getVersion());
     // 4. Write the block type
-    this.output.writeInt(block.getBlockType().ordinal());
+    outputStream.writeInt(block.getBlockType().ordinal());
 
     // 5. Write the headers for the log block
-    this.output.write(headerBytes);
+    outputStream.write(headerBytes);
     // 6. Write the size of the content block
-    this.output.writeLong(content.length);
+    outputStream.writeLong(content.length);
     // 7. Write the contents of the data block
-    this.output.write(content);
+    outputStream.write(content);
     // 8. Write the footers for the log block
-    this.output.write(footerBytes);
+    outputStream.write(footerBytes);
     // 9. Write the total size of the log block (including magic) which is 
everything written
     // until now (for reverse pointer)
     // Update: this information is now used in determining if a block is 
corrupt by comparing to the
     //   block size in header. This change assumes that the block size will be 
the last data written
     //   to a block. Read will break if any data is written past this point 
for a block.
-    this.output.writeLong(this.output.size() - currentSize);
+    outputStream.writeLong(outputStream.size() - currentSize);
     // Flush every block to disk
     flush();
 
@@ -207,9 +222,12 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
 
   @Override
   public void close() throws IOException {
-    flush();
-    output.close();
-    output = null;
+    if (output != null) {
+      flush();
+      output.close();
+      output = null;
+      closed = true;
+    }
   }
 
   private void flush() throws IOException {
@@ -224,9 +242,13 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
 
   @Override
   public long getCurrentSize() throws IOException {
-    if (output == null) {
+    if (closed) {
       throw new IllegalStateException("Cannot get current size as the 
underlying stream has been closed already");
     }
+
+    if (output == null) {
+      return 0;
+    }
     return output.getPos();
   }
 
@@ -302,5 +324,4 @@ public class HoodieLogFormatWriter implements 
HoodieLogFormat.Writer {
       }
     }
   }
-
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 1d7ca97..fa25e17 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -145,6 +145,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     assertEquals(size, 
fs.getFileStatus(writer.getLogFile().getPath()).getLen(),
         "Write should be auto-flushed. The size reported by FileStatus and the 
writer should match");
     writer.close();
+
   }
 
   @ParameterizedTest
@@ -174,6 +175,8 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer = writer.appendBlock(dataBlock);
     assertEquals(0, writer.getCurrentSize(), "This should be a new log file 
and hence size should be 0");
     assertEquals(2, writer.getLogFile().getLogVersion(), "Version should be 
rolled to 2");
+    Path logFilePath = writer.getLogFile().getPath();
+    assertFalse(fs.exists(logFilePath), "Path (" + logFilePath + ") must not 
exist");
     writer.close();
   }
 
@@ -216,16 +219,16 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       builder1 = 
builder1.withLogVersion(1).withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
     }
     Writer writer = builder1.build();
-    Writer writer2 = builder2.build();
-    HoodieLogFile logFile1 = writer.getLogFile();
-    HoodieLogFile logFile2 = writer2.getLogFile();
     List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
getSimpleSchema().toString());
     HoodieDataBlock dataBlock = getDataBlock(records, header);
     writer = writer.appendBlock(dataBlock);
+    Writer writer2 = builder2.build();
     writer2 = writer2.appendBlock(dataBlock);
+    HoodieLogFile logFile1 = writer.getLogFile();
+    HoodieLogFile logFile2 = writer2.getLogFile();
     writer.close();
     writer2.close();
     assertNotNull(logFile1.getLogWriteToken());
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
index 201ed4f..71616f6 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieArchivedLogFile;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
@@ -139,6 +140,11 @@ public class TestHoodieLogFormatAppendFailure {
     writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
         
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits.archive")
         .overBaseCommit("").withFs(fs).build();
+    header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
+        
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
+
+    writer.appendBlock(new HoodieCommandBlock(header));
     // The log version should be different for this new writer
     assertNotEquals(writer.getLogFile().getLogVersion(), logFileVersion);
   }

Reply via email to