Repository: beam
Updated Branches:
  refs/heads/master ede77c1b5 -> 41de9301a


[BEAM-1465] No natural place to flush/close resources in FileBasedWriter


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2b6a08f6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2b6a08f6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2b6a08f6

Branch: refs/heads/master
Commit: 2b6a08f6cb37a9c51d9ae5a63e6b69a310bb3aae
Parents: ede77c1
Author: Aviem Zur <aviem...@gmail.com>
Authored: Wed Feb 22 06:26:38 2017 +0200
Committer: Dan Halperin <dhalp...@google.com>
Committed: Wed Feb 22 12:37:30 2017 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/io/AvroIO.java     |  2 +-
 .../main/java/org/apache/beam/sdk/io/FileBasedSink.java  | 11 +++++++++++
 .../src/main/java/org/apache/beam/sdk/io/TextIO.java     | 11 +++++++----
 3 files changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2b6a08f6/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 01a4cba..388d9f0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -1032,7 +1032,7 @@ public class AvroIO {
       }
 
       @Override
-      protected void writeFooter() throws Exception {
+      protected void finishWrite() throws Exception {
         dataFileWriter.flush();
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/2b6a08f6/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 32b8b4f..e14ba59 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -590,6 +590,12 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     protected void writeFooter() throws Exception {}
 
     /**
+     * Called after all calls to {@link #writeHeader}, {@link #write} and 
{@link #writeFooter}.
+     * If any resources opened in the write processes need to be flushed, 
flush them here.
+     */
+    protected void finishWrite() throws Exception {}
+
+    /**
      * Opens the channel.
      */
     @Override
@@ -630,6 +636,11 @@ public abstract class FileBasedSink<T> extends Sink<T> {
       try (WritableByteChannel theChannel = channel) {
         LOG.debug("Writing footer to {}.", filename);
         writeFooter();
+        LOG.debug("Finishing write to {}.", filename);
+        finishWrite();
+        if (!channel.isOpen()) {
+          throw new IllegalStateException("Channel should only be closed by 
its owner: " + channel);
+        }
       }
       FileResult result = new FileResult(filename);
       LOG.debug("Result for bundle {}: {}", this.id, filename);

http://git-wip-us.apache.org/repos/asf/beam/blob/2b6a08f6/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 726411c..86e6989 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -1101,15 +1101,18 @@ public class TextIO {
       }
 
       @Override
+      public void write(String value) throws Exception {
+        writeLine(value);
+      }
+
+      @Override
       protected void writeFooter() throws Exception {
         writeIfNotNull(footer);
-        // Flush here because there is currently no other natural place to do 
this. [BEAM-1465]
-        out.flush();
       }
 
       @Override
-      public void write(String value) throws Exception {
-        writeLine(value);
+      protected void finishWrite() throws Exception {
+        out.flush();
       }
     }
   }

Reply via email to