This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 32c9cad [HUDI-840] Avoid blank file created by HoodieLogFormatWriter (#1567) 32c9cad is described below commit 32c9cad52c7eef1067f12867afc405a1624cef14 Author: hongdd <jn_...@163.com> AuthorDate: Tue Sep 29 23:02:15 2020 +0800 [HUDI-840] Avoid blank file created by HoodieLogFormatWriter (#1567) --- .../common/table/log/HoodieLogFormatWriter.java | 121 ++++++++++++--------- .../common/functional/TestHoodieLogFormat.java | 9 +- .../TestHoodieLogFormatAppendFailure.java | 6 + 3 files changed, 83 insertions(+), 53 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java index d9c23b6..8909477 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java @@ -54,6 +54,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { private final String logWriteToken; private final String rolloverLogWriteToken; private FSDataOutputStream output; + private boolean closed = false; private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not sufficiently replicated yet"; /** @@ -64,7 +65,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { * @param sizeThreshold */ HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer bufferSize, Short replication, Long sizeThreshold, - String logWriteToken, String rolloverLogWriteToken) throws IOException, InterruptedException { + String logWriteToken, String rolloverLogWriteToken) { this.fs = fs; this.logFile = logFile; this.sizeThreshold = sizeThreshold; @@ -73,40 +74,6 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { this.logWriteToken = logWriteToken; this.rolloverLogWriteToken = rolloverLogWriteToken; addShutDownHook(); - Path path = logFile.getPath(); - if (fs.exists(path)) { - boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme()); - if (isAppendSupported) { - LOG.info(logFile + " exists. Appending to existing file"); - try { - this.output = fs.append(path, bufferSize); - } catch (RemoteException e) { - LOG.warn("Remote Exception, attempting to handle or recover lease", e); - handleAppendExceptionOrRecoverLease(path, e); - } catch (IOException ioe) { - if (ioe.getMessage().toLowerCase().contains("not supported")) { - // may still happen if scheme is viewfs. - isAppendSupported = false; - } else { - /* - * Before throwing an exception, close the outputstream, - * to ensure that the lease on the log file is released. - */ - close(); - throw ioe; - } - } - } - if (!isAppendSupported) { - this.logFile = logFile.rollOver(fs, rolloverLogWriteToken); - LOG.info("Append not supported.. Rolling over to " + logFile); - createNewFile(); - } - } else { - LOG.info(logFile + " does not exist. Create a new file"); - // Block size does not matter as we will always manually autoflush - createNewFile(); - } } public FileSystem getFs() { @@ -122,16 +89,64 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { return sizeThreshold; } + /** + * Lazily opens the output stream if needed for writing. + * @return OutputStream for writing to current log file. + * @throws IOException + * @throws InterruptedException + */ + private FSDataOutputStream getOutputStream() throws IOException, InterruptedException { + if (this.output == null) { + Path path = logFile.getPath(); + if (fs.exists(path)) { + boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme()); + if (isAppendSupported) { + LOG.info(logFile + " exists. Appending to existing file"); + try { + this.output = fs.append(path, bufferSize); + } catch (RemoteException e) { + LOG.warn("Remote Exception, attempting to handle or recover lease", e); + handleAppendExceptionOrRecoverLease(path, e); + } catch (IOException ioe) { + if (ioe.getMessage().toLowerCase().contains("not supported")) { + // may still happen if scheme is viewfs. + isAppendSupported = false; + } else { + /* + * Before throwing an exception, close the outputstream, + * to ensure that the lease on the log file is released. + */ + close(); + throw ioe; + } + } + } + if (!isAppendSupported) { + this.logFile = logFile.rollOver(fs, rolloverLogWriteToken); + LOG.info("Append not supported.. Rolling over to " + logFile); + createNewFile(); + } + } else { + LOG.info(logFile + " does not exist. Create a new file"); + // Block size does not matter as we will always manually autoflush + createNewFile(); + } + } + return output; + } + @Override public Writer appendBlock(HoodieLogBlock block) throws IOException, InterruptedException { // Find current version HoodieLogFormat.LogFormatVersion currentLogFormatVersion = new HoodieLogFormatVersion(HoodieLogFormat.CURRENT_VERSION); - long currentSize = this.output.size(); + + FSDataOutputStream outputStream = getOutputStream(); + long currentSize = outputStream.size(); // 1. Write the magic header for the start of the block - this.output.write(HoodieLogFormat.MAGIC); + outputStream.write(HoodieLogFormat.MAGIC); // bytes for header byte[] headerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockHeader()); @@ -141,27 +156,27 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { byte[] footerBytes = HoodieLogBlock.getLogMetadataBytes(block.getLogBlockFooter()); // 2. Write the total size of the block (excluding Magic) - this.output.writeLong(getLogBlockLength(content.length, headerBytes.length, footerBytes.length)); + outputStream.writeLong(getLogBlockLength(content.length, headerBytes.length, footerBytes.length)); // 3. Write the version of this log block - this.output.writeInt(currentLogFormatVersion.getVersion()); + outputStream.writeInt(currentLogFormatVersion.getVersion()); // 4. Write the block type - this.output.writeInt(block.getBlockType().ordinal()); + outputStream.writeInt(block.getBlockType().ordinal()); // 5. Write the headers for the log block - this.output.write(headerBytes); + outputStream.write(headerBytes); // 6. Write the size of the content block - this.output.writeLong(content.length); + outputStream.writeLong(content.length); // 7. Write the contents of the data block - this.output.write(content); + outputStream.write(content); // 8. Write the footers for the log block - this.output.write(footerBytes); + outputStream.write(footerBytes); // 9. Write the total size of the log block (including magic) which is everything written // until now (for reverse pointer) // Update: this information is now used in determining if a block is corrupt by comparing to the // block size in header. This change assumes that the block size will be the last data written // to a block. Read will break if any data is written past this point for a block. - this.output.writeLong(this.output.size() - currentSize); + outputStream.writeLong(outputStream.size() - currentSize); // Flush every block to disk flush(); @@ -207,9 +222,12 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { @Override public void close() throws IOException { - flush(); - output.close(); - output = null; + if (output != null) { + flush(); + output.close(); + output = null; + closed = true; + } } private void flush() throws IOException { @@ -224,9 +242,13 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { @Override public long getCurrentSize() throws IOException { - if (output == null) { + if (closed) { throw new IllegalStateException("Cannot get current size as the underlying stream has been closed already"); } + + if (output == null) { + return 0; + } return output.getPos(); } @@ -302,5 +324,4 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { } } } - } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 1d7ca97..fa25e17 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -145,6 +145,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { assertEquals(size, fs.getFileStatus(writer.getLogFile().getPath()).getLen(), "Write should be auto-flushed. The size reported by FileStatus and the writer should match"); writer.close(); + } @ParameterizedTest @@ -174,6 +175,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { writer = writer.appendBlock(dataBlock); assertEquals(0, writer.getCurrentSize(), "This should be a new log file and hence size should be 0"); assertEquals(2, writer.getLogFile().getLogVersion(), "Version should be rolled to 2"); + Path logFilePath = writer.getLogFile().getPath(); + assertFalse(fs.exists(logFilePath), "Path (" + logFilePath + ") must not exist"); writer.close(); } @@ -216,16 +219,16 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { builder1 = builder1.withLogVersion(1).withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN); } Writer writer = builder1.build(); - Writer writer2 = builder2.build(); - HoodieLogFile logFile1 = writer.getLogFile(); - HoodieLogFile logFile2 = writer2.getLogFile(); List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100); Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(); header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString()); HoodieDataBlock dataBlock = getDataBlock(records, header); writer = writer.appendBlock(dataBlock); + Writer writer2 = builder2.build(); writer2 = writer2.appendBlock(dataBlock); + HoodieLogFile logFile1 = writer.getLogFile(); + HoodieLogFile logFile2 = writer2.getLogFile(); writer.close(); writer2.close(); assertNotNull(logFile1.getLogWriteToken()); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java index 201ed4f..71616f6 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieArchivedLogFile; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.log.HoodieLogFormat.Writer; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; +import org.apache.hudi.common.table.log.block.HoodieCommandBlock; import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil; @@ -139,6 +140,11 @@ public class TestHoodieLogFormatAppendFailure { writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath) .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits.archive") .overBaseCommit("").withFs(fs).build(); + header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE, + String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal())); + + writer.appendBlock(new HoodieCommandBlock(header)); // The log version should be different for this new writer assertNotEquals(writer.getLogFile().getLogVersion(), logFileVersion); }