This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
     new ac4aa35c6e2 [FLINK-35217] Add missing fsync to #closeForCommit in some 
subclasses of RecoverableFsDataOutputStream. (#24722) (#24751)
ac4aa35c6e2 is described below

commit ac4aa35c6e2e2da87760ffbf45d85888b1976c2f
Author: Stefan Richter <srich...@apache.org>
AuthorDate: Tue Apr 30 22:25:27 2024 +0200

    [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of 
RecoverableFsDataOutputStream. (#24722) (#24751)
    
    (cherry picked from commit 80af4d502318348ba15a8f75a2a622ce9dbdc968)
---
 ...erFromPersistRecoverableFsDataOutputStream.java |  59 +++++++
 .../local/LocalRecoverableFsDataOutputStream.java  |  23 ++-
 .../AbstractRecoverableFsDataOutputStreamTest.java |  98 +++++++++++
 .../LocalRecoverableFsDataOutputStreamTest.java    | 188 +++++++++++++++++++++
 .../AzureBlobFsRecoverableDataOutputStream.java    |  17 +-
 ...AzureBlobFsRecoverableDataOutputStreamTest.java | 100 +++++++++++
 .../BaseHadoopFsRecoverableFsDataOutputStream.java |  12 +-
 .../hdfs/HadoopRecoverableFsDataOutputStream.java  |  17 +-
 .../HadoopRecoverableFsDataOutputStreamTest.java   |  89 ++++++++++
 9 files changed, 579 insertions(+), 24 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java
new file mode 100644
index 00000000000..12e3fbc7f4d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import java.io.IOException;
+
+/**
+ * {@link RecoverableFsDataOutputStream} with fixed implementation of {@link 
#closeForCommit()} that
+ * is based on using {@link #persist()} to ensure durability and creates the 
{@link
+ * org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer} from the 
corresponding {@link
+ * org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable}.
+ *
+ * @param <RESUME_RECOVERABLE> return type of #persist()
+ */
+public abstract class CommitterFromPersistRecoverableFsDataOutputStream<
+                RESUME_RECOVERABLE extends RecoverableWriter.ResumeRecoverable>
+        extends RecoverableFsDataOutputStream {
+
+    /** @see RecoverableFsDataOutputStream#persist() */
+    @Override
+    public abstract RESUME_RECOVERABLE persist() throws IOException;
+
+    /**
+     * @see RecoverableFsDataOutputStream#closeForCommit()
+     * @param recoverable a resume recoverable to create the committer from. 
Typically the parameter
+     *     is the return value of {@link #persist()}.
+     * @return the committer created from recoverable.
+     */
+    protected abstract Committer createCommitterFromResumeRecoverable(
+            RESUME_RECOVERABLE recoverable);
+
+    /**
+     * @see RecoverableFsDataOutputStream#closeForCommit()
+     * @implNote Calls persist to ensure durability of the written data and 
creates a committer
+     *     object from the return value of {@link #persist()}.
+     */
+    @Override
+    public final Committer closeForCommit() throws IOException {
+        Committer committer = createCommitterFromResumeRecoverable(persist());
+        close();
+        return committer;
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
index cc9c88fc4f4..c273c31960e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java
@@ -19,9 +19,10 @@
 package org.apache.flink.core.fs.local;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.core.fs.CommitterFromPersistRecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
-import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -40,7 +41,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 /** A {@link RecoverableFsDataOutputStream} for the {@link LocalFileSystem}. */
 @Internal
-public class LocalRecoverableFsDataOutputStream extends 
RecoverableFsDataOutputStream {
+public class LocalRecoverableFsDataOutputStream
+        extends 
CommitterFromPersistRecoverableFsDataOutputStream<LocalRecoverable> {
 
     private final File targetFile;
 
@@ -78,6 +80,15 @@ public class LocalRecoverableFsDataOutputStream extends 
RecoverableFsDataOutputS
         this.fos = Channels.newOutputStream(fileChannel);
     }
 
+    @VisibleForTesting
+    LocalRecoverableFsDataOutputStream(
+            File targetFile, File tempFile, FileChannel fileChannel, 
OutputStream fos) {
+        this.targetFile = checkNotNull(targetFile);
+        this.tempFile = checkNotNull(tempFile);
+        this.fileChannel = fileChannel;
+        this.fos = fos;
+    }
+
     @Override
     public void write(int b) throws IOException {
         fos.write(b);
@@ -104,7 +115,7 @@ public class LocalRecoverableFsDataOutputStream extends 
RecoverableFsDataOutputS
     }
 
     @Override
-    public ResumeRecoverable persist() throws IOException {
+    public LocalRecoverable persist() throws IOException {
         // we call both flush and sync in order to ensure persistence on 
mounted
         // file systems, like NFS, EBS, EFS, ...
         flush();
@@ -114,10 +125,8 @@ public class LocalRecoverableFsDataOutputStream extends 
RecoverableFsDataOutputS
     }
 
     @Override
-    public Committer closeForCommit() throws IOException {
-        final long pos = getPos();
-        close();
-        return new LocalCommitter(new LocalRecoverable(targetFile, tempFile, 
pos));
+    protected Committer createCommitterFromResumeRecoverable(LocalRecoverable 
recoverable) {
+        return new LocalCommitter(recoverable);
     }
 
     @Override
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/local/AbstractRecoverableFsDataOutputStreamTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/fs/local/AbstractRecoverableFsDataOutputStreamTest.java
new file mode 100644
index 00000000000..8b7b5c4b669
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/local/AbstractRecoverableFsDataOutputStreamTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+/** Base class for testing implementations of {@link 
RecoverableFsDataOutputStream}. */
+public abstract class AbstractRecoverableFsDataOutputStreamTest {
+
+    /** Events for methods being called on the stream. */
+    public enum Event {
+        CLOSE,
+        FLUSH,
+        SYNC
+    }
+
+    @TempDir Path tmp;
+
+    /**
+     * Tests that #closeForCommit leads to a durable write to the temporary 
file and to target on
+     * commit.
+     */
+    @Test
+    public void testDurableWriteOnCommit() throws IOException {
+        // Setup
+        final int seed = 4711;
+        final Random random = new Random(seed);
+        final byte[] buffer = new byte[4 * 4096];
+        final List<LocalRecoverableFsDataOutputStreamTest.Event> testLog = new 
ArrayList<>();
+        final Path target = tmp.resolve("target");
+        final Path temp = tmp.resolve("temp");
+
+        Tuple2<RecoverableFsDataOutputStream, Closeable> testInstance =
+                createTestInstance(target, temp, testLog);
+
+        // Create test object
+        final RecoverableFsDataOutputStream testOutStreamInstance = 
testInstance.f0;
+
+        // Write test data
+        random.nextBytes(buffer);
+        testOutStreamInstance.write(buffer);
+
+        // Test closeForCommit
+        Assertions.assertTrue(testLog.isEmpty());
+        RecoverableFsDataOutputStream.Committer committer = 
testOutStreamInstance.closeForCommit();
+        Assertions.assertEquals(getExpectedResult(), testLog);
+
+        testInstance.f1.close();
+        Assertions.assertArrayEquals(buffer, FileUtils.readAllBytes(temp));
+
+        // Test commit
+        Assertions.assertFalse(target.toFile().exists());
+        committer.commit();
+        Assertions.assertTrue(target.toFile().exists());
+        Assertions.assertArrayEquals(buffer, FileUtils.readAllBytes(target));
+    }
+
+    public abstract Tuple2<RecoverableFsDataOutputStream, Closeable> 
createTestInstance(
+            Path target, Path temp, 
List<LocalRecoverableFsDataOutputStreamTest.Event> testLog)
+            throws IOException;
+
+    public List<Event> getExpectedResult() {
+        return Arrays.asList(
+                LocalRecoverableFsDataOutputStreamTest.Event.FLUSH,
+                LocalRecoverableFsDataOutputStreamTest.Event.SYNC,
+                LocalRecoverableFsDataOutputStreamTest.Event.CLOSE);
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStreamTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStreamTest.java
new file mode 100644
index 00000000000..be0e60aa0d4
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStreamTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+
+/** Unit tests for {@link LocalRecoverableFsDataOutputStream}. */
+public class LocalRecoverableFsDataOutputStreamTest
+        extends AbstractRecoverableFsDataOutputStreamTest {
+
+    @Override
+    public Tuple2<RecoverableFsDataOutputStream, Closeable> createTestInstance(
+            Path target, Path temp, List<Event> testLog) throws IOException {
+        final FileChannel fileChannel =
+                new TestFileChannel(
+                        FileChannel.open(
+                                temp, StandardOpenOption.WRITE, 
StandardOpenOption.CREATE_NEW),
+                        testLog);
+
+        final TestOutputStream fos =
+                new TestOutputStream(
+                        new 
BufferedOutputStream(Channels.newOutputStream(fileChannel)), testLog);
+
+        // Create test object
+        final RecoverableFsDataOutputStream testOutStreamInstance =
+                new LocalRecoverableFsDataOutputStream(
+                        target.toFile(), temp.toFile(), fileChannel, fos);
+
+        return new Tuple2<>(testOutStreamInstance, fos::actualClose);
+    }
+
+    private static class TestOutputStream extends FilterOutputStream {
+
+        private final List<Event> events;
+
+        public TestOutputStream(OutputStream out, List<Event> events) {
+            super(out);
+            this.events = events;
+        }
+
+        @Override
+        public void flush() throws IOException {
+            super.flush();
+            events.add(Event.FLUSH);
+        }
+
+        @Override
+        public void close() {
+            events.add(Event.CLOSE);
+            // Do nothing on close.
+        }
+
+        public void actualClose() throws IOException {
+            super.close();
+        }
+    }
+
+    static class TestFileChannel extends FileChannel {
+
+        final FileChannel delegate;
+
+        private final List<Event> events;
+
+        TestFileChannel(FileChannel delegate, List<Event> events) {
+            this.delegate = delegate;
+            this.events = events;
+        }
+
+        @Override
+        public int read(ByteBuffer dst) throws IOException {
+            return delegate.read(dst);
+        }
+
+        @Override
+        public long read(ByteBuffer[] dsts, int offset, int length) throws 
IOException {
+            return delegate.read(dsts, offset, length);
+        }
+
+        @Override
+        public int write(ByteBuffer src) throws IOException {
+            return delegate.write(src);
+        }
+
+        @Override
+        public long write(ByteBuffer[] srcs, int offset, int length) throws 
IOException {
+            return delegate.write(srcs, offset, length);
+        }
+
+        @Override
+        public long position() throws IOException {
+            return delegate.position();
+        }
+
+        @Override
+        public FileChannel position(long newPosition) throws IOException {
+            return delegate.position(newPosition);
+        }
+
+        @Override
+        public long size() throws IOException {
+            return delegate.size();
+        }
+
+        @Override
+        public FileChannel truncate(long size) throws IOException {
+            return delegate.truncate(size);
+        }
+
+        @Override
+        public void force(boolean metaData) throws IOException {
+            delegate.force(metaData);
+            events.add(Event.SYNC);
+        }
+
+        @Override
+        public long transferTo(long position, long count, WritableByteChannel 
target)
+                throws IOException {
+            return delegate.transferTo(position, count, target);
+        }
+
+        @Override
+        public long transferFrom(ReadableByteChannel src, long position, long 
count)
+                throws IOException {
+            return delegate.transferFrom(src, position, count);
+        }
+
+        @Override
+        public int read(ByteBuffer dst, long position) throws IOException {
+            return delegate.read(dst, position);
+        }
+
+        @Override
+        public int write(ByteBuffer src, long position) throws IOException {
+            return delegate.write(src, position);
+        }
+
+        @Override
+        public MappedByteBuffer map(MapMode mode, long position, long size) 
throws IOException {
+            return delegate.map(mode, position, size);
+        }
+
+        @Override
+        public FileLock lock(long position, long size, boolean shared) throws 
IOException {
+            return delegate.lock(position, size, shared);
+        }
+
+        @Override
+        public FileLock tryLock(long position, long size, boolean shared) 
throws IOException {
+            return delegate.tryLock(position, size, shared);
+        }
+
+        @Override
+        protected void implCloseChannel() {}
+    }
+}
diff --git 
a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java
 
b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java
index 1236c3cd540..232bc6a2cb7 100644
--- 
a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java
+++ 
b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java
@@ -63,6 +63,16 @@ public class AzureBlobFsRecoverableDataOutputStream
         this.out = fs.create(tempFile);
     }
 
+    /** Use only for testing! */
+    @VisibleForTesting
+    AzureBlobFsRecoverableDataOutputStream(
+            FileSystem fs, Path targetFile, Path tempFile, FSDataOutputStream 
out) {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        this.tempFile = checkNotNull(tempFile);
+        this.out = out;
+    }
+
     AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable 
recoverable)
             throws IOException {
         this.fs = checkNotNull(fs);
@@ -215,11 +225,8 @@ public class AzureBlobFsRecoverableDataOutputStream
     }
 
     @Override
-    public Committer closeForCommit() throws IOException {
-        final long pos = getPos();
-        close();
-        return new AzureBlobFsRecoverableDataOutputStream.ABFSCommitter(
-                fs, createHadoopFsRecoverable(pos));
+    protected Committer 
createCommitterFromResumeRecoverable(HadoopFsRecoverable recoverable) {
+        return new ABFSCommitter(fs, recoverable);
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStreamTest.java
 
b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStreamTest.java
new file mode 100644
index 00000000000..3c451d793e9
--- /dev/null
+++ 
b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStreamTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.azurefs;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import 
org.apache.flink.core.fs.local.AbstractRecoverableFsDataOutputStreamTest;
+import org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStreamTest;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+
+/** Unit tests for {@link AzureBlobFsRecoverableDataOutputStream}. */
+public class AzureBlobFsRecoverableDataOutputStreamTest
+        extends AbstractRecoverableFsDataOutputStreamTest {
+
+    @Override
+    public Tuple2<RecoverableFsDataOutputStream, Closeable> createTestInstance(
+            Path target, Path temp, List<Event> testLog) throws IOException {
+        final TestFSDataOutputStream fos =
+                new TestFSDataOutputStream(
+                        new BufferedOutputStream(Files.newOutputStream(temp)), 
testLog);
+
+        final AzureBlobFsRecoverableDataOutputStream testOutStreamInstance =
+                new AzureBlobFsRecoverableDataOutputStream(
+                        FileSystem.getLocal(new Configuration()),
+                        new org.apache.hadoop.fs.Path(target.toUri()),
+                        new org.apache.hadoop.fs.Path(temp.toUri()),
+                        fos);
+
+        return new Tuple2<>(testOutStreamInstance, fos::actualClose);
+    }
+
+    private static class TestFSDataOutputStream extends FSDataOutputStream {
+
+        private final List<Event> events;
+
+        public TestFSDataOutputStream(OutputStream out, List<Event> events) 
throws IOException {
+            super(out, new FileSystem.Statistics("test_stats"));
+            this.events = events;
+        }
+
+        @Override
+        public void hflush() throws IOException {
+            super.hflush();
+            events.add(Event.FLUSH);
+        }
+
+        @Override
+        public void hsync() throws IOException {
+            super.hsync();
+            events.add(Event.SYNC);
+        }
+
+        @Override
+        public void close() {
+            events.add(Event.CLOSE);
+            // Do nothing on close.
+        }
+
+        public void actualClose() throws IOException {
+            super.close();
+        }
+    }
+
+    @Override
+    public List<Event> getExpectedResult() {
+        // Seems that Azure does not require flush before sync, see
+        // https://github.com/apache/flink/pull/21508#discussion_r1064351162
+        return Arrays.asList(
+                LocalRecoverableFsDataOutputStreamTest.Event.SYNC,
+                LocalRecoverableFsDataOutputStreamTest.Event.CLOSE);
+    }
+}
diff --git 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/BaseHadoopFsRecoverableFsDataOutputStream.java
 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/BaseHadoopFsRecoverableFsDataOutputStream.java
index 93628a349a8..7a8563f15a4 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/BaseHadoopFsRecoverableFsDataOutputStream.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/BaseHadoopFsRecoverableFsDataOutputStream.java
@@ -19,8 +19,7 @@
 package org.apache.flink.runtime.fs.hdfs;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
+import 
org.apache.flink.core.fs.CommitterFromPersistRecoverableFsDataOutputStream;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -31,7 +30,7 @@ import java.io.IOException;
 /** Base class for ABFS and Hadoop recoverable stream. */
 @Internal
 public abstract class BaseHadoopFsRecoverableFsDataOutputStream
-        extends RecoverableFsDataOutputStream {
+        extends 
CommitterFromPersistRecoverableFsDataOutputStream<HadoopFsRecoverable> {
 
     protected FileSystem fs;
 
@@ -70,18 +69,15 @@ public abstract class 
BaseHadoopFsRecoverableFsDataOutputStream
     }
 
     @Override
-    public RecoverableWriter.ResumeRecoverable persist() throws IOException {
+    public HadoopFsRecoverable persist() throws IOException {
         sync();
         return createHadoopFsRecoverable(getPos());
     }
 
-    public HadoopFsRecoverable createHadoopFsRecoverable(long pos) throws 
IOException {
+    public HadoopFsRecoverable createHadoopFsRecoverable(long pos) {
         return new HadoopFsRecoverable(targetFile, tempFile, pos + 
initialFileSize);
     }
 
-    @Override
-    public abstract Committer closeForCommit() throws IOException;
-
     @Override
     public void close() throws IOException {
         out.close();
diff --git 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
index bf5f19a799e..8f695ea777e 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.fs.hdfs;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
@@ -28,6 +29,7 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -66,6 +68,15 @@ class HadoopRecoverableFsDataOutputStream extends 
BaseHadoopFsRecoverableFsDataO
         this.out = fs.create(tempFile);
     }
 
+    @VisibleForTesting
+    HadoopRecoverableFsDataOutputStream(
+            FileSystem fs, Path targetFile, Path tempFile, FSDataOutputStream 
out) {
+        this.fs = checkNotNull(fs);
+        this.targetFile = checkNotNull(targetFile);
+        this.tempFile = checkNotNull(tempFile);
+        this.out = out;
+    }
+
     HadoopRecoverableFsDataOutputStream(FileSystem fs, HadoopFsRecoverable 
recoverable)
             throws IOException {
 
@@ -95,10 +106,8 @@ class HadoopRecoverableFsDataOutputStream extends 
BaseHadoopFsRecoverableFsDataO
     }
 
     @Override
-    public Committer closeForCommit() throws IOException {
-        final long pos = getPos();
-        close();
-        return new HadoopFsCommitter(fs, createHadoopFsRecoverable(pos));
+    protected Committer 
createCommitterFromResumeRecoverable(HadoopFsRecoverable recoverable) {
+        return new HadoopFsCommitter(fs, recoverable);
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStreamTest.java
 
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStreamTest.java
new file mode 100644
index 00000000000..bccce5be843
--- /dev/null
+++ 
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStreamTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import 
org.apache.flink.core.fs.local.AbstractRecoverableFsDataOutputStreamTest;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.BufferedOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+
+/** Unit tests for {@link HadoopRecoverableFsDataOutputStream}. */
+public class HadoopRecoverableFsDataOutputStreamTest
+        extends AbstractRecoverableFsDataOutputStreamTest {
+
+    @Override
+    public Tuple2<RecoverableFsDataOutputStream, Closeable> createTestInstance(
+            Path target, Path temp, List<Event> testLog) throws IOException {
+        final TestFSDataOutputStream fos =
+                new TestFSDataOutputStream(
+                        new BufferedOutputStream(Files.newOutputStream(temp)), 
testLog);
+
+        final HadoopRecoverableFsDataOutputStream testOutStreamInstance =
+                new HadoopRecoverableFsDataOutputStream(
+                        FileSystem.getLocal(new Configuration()),
+                        new org.apache.hadoop.fs.Path(target.toUri()),
+                        new org.apache.hadoop.fs.Path(temp.toUri()),
+                        fos);
+
+        return new Tuple2<>(testOutStreamInstance, fos::actualClose);
+    }
+
+    private static class TestFSDataOutputStream extends FSDataOutputStream {
+
+        private final List<Event> events;
+
+        public TestFSDataOutputStream(OutputStream out, List<Event> events) 
throws IOException {
+            super(out, new FileSystem.Statistics("test_stats"));
+            this.events = events;
+        }
+
+        @Override
+        public void hflush() throws IOException {
+            super.hflush();
+            events.add(Event.FLUSH);
+        }
+
+        @Override
+        public void hsync() throws IOException {
+            super.hsync();
+            events.add(Event.SYNC);
+        }
+
+        @Override
+        public void close() {
+            events.add(Event.CLOSE);
+            // Do nothing on close.
+        }
+
+        public void actualClose() throws IOException {
+            super.close();
+        }
+    }
+}

Reply via email to