Repository: beam Updated Branches: refs/heads/master ede77c1b5 -> 41de9301a
[BEAM-1465] No natural place to flush/close resources in FileBasedWriter Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2b6a08f6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2b6a08f6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2b6a08f6 Branch: refs/heads/master Commit: 2b6a08f6cb37a9c51d9ae5a63e6b69a310bb3aae Parents: ede77c1 Author: Aviem Zur <aviem...@gmail.com> Authored: Wed Feb 22 06:26:38 2017 +0200 Committer: Dan Halperin <dhalp...@google.com> Committed: Wed Feb 22 12:37:30 2017 -0800 ---------------------------------------------------------------------- .../src/main/java/org/apache/beam/sdk/io/AvroIO.java | 2 +- .../main/java/org/apache/beam/sdk/io/FileBasedSink.java | 11 +++++++++++ .../src/main/java/org/apache/beam/sdk/io/TextIO.java | 11 +++++++---- 3 files changed, 19 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2b6a08f6/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 01a4cba..388d9f0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -1032,7 +1032,7 @@ public class AvroIO { } @Override - protected void writeFooter() throws Exception { + protected void finishWrite() throws Exception { dataFileWriter.flush(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/2b6a08f6/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 32b8b4f..e14ba59 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -590,6 +590,12 @@ public abstract class FileBasedSink<T> extends Sink<T> { protected void writeFooter() throws Exception {} /** + * Called after all calls to {@link #writeHeader}, {@link #write} and {@link #writeFooter}. + * If any resources opened in the write processes need to be flushed, flush them here. + */ + protected void finishWrite() throws Exception {} + + /** * Opens the channel. */ @Override @@ -630,6 +636,11 @@ public abstract class FileBasedSink<T> extends Sink<T> { try (WritableByteChannel theChannel = channel) { LOG.debug("Writing footer to {}.", filename); writeFooter(); + LOG.debug("Finishing write to {}.", filename); + finishWrite(); + if (!channel.isOpen()) { + throw new IllegalStateException("Channel should only be closed by its owner: " + channel); + } } FileResult result = new FileResult(filename); LOG.debug("Result for bundle {}: {}", this.id, filename); http://git-wip-us.apache.org/repos/asf/beam/blob/2b6a08f6/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 726411c..86e6989 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -1101,15 +1101,18 @@ public class TextIO { } @Override + public void write(String value) throws Exception { + writeLine(value); + } + + @Override protected void writeFooter() throws Exception { writeIfNotNull(footer); - // Flush here because there is currently no other natural place to do this. [BEAM-1465] - out.flush(); } @Override - public void write(String value) throws Exception { - writeLine(value); + protected void finishWrite() throws Exception { + out.flush(); } } }