This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push: new ac4aa35c6e2 [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) (#24751) ac4aa35c6e2 is described below commit ac4aa35c6e2e2da87760ffbf45d85888b1976c2f Author: Stefan Richter <srich...@apache.org> AuthorDate: Tue Apr 30 22:25:27 2024 +0200 [FLINK-35217] Add missing fsync to #closeForCommit in some subclasses of RecoverableFsDataOutputStream. (#24722) (#24751) (cherry picked from commit 80af4d502318348ba15a8f75a2a622ce9dbdc968) --- ...erFromPersistRecoverableFsDataOutputStream.java | 59 +++++++ .../local/LocalRecoverableFsDataOutputStream.java | 23 ++- .../AbstractRecoverableFsDataOutputStreamTest.java | 98 +++++++++++ .../LocalRecoverableFsDataOutputStreamTest.java | 188 +++++++++++++++++++++ .../AzureBlobFsRecoverableDataOutputStream.java | 17 +- ...AzureBlobFsRecoverableDataOutputStreamTest.java | 100 +++++++++++ .../BaseHadoopFsRecoverableFsDataOutputStream.java | 12 +- .../hdfs/HadoopRecoverableFsDataOutputStream.java | 17 +- .../HadoopRecoverableFsDataOutputStreamTest.java | 89 ++++++++++ 9 files changed, 579 insertions(+), 24 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java new file mode 100644 index 00000000000..12e3fbc7f4d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/core/fs/CommitterFromPersistRecoverableFsDataOutputStream.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import java.io.IOException; + +/** + * {@link RecoverableFsDataOutputStream} with fixed implementation of {@link #closeForCommit()} that + * is based on using {@link #persist()} to ensure durability and creates the {@link + * org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer} from the corresponding {@link + * org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable}. + * + * @param <RESUME_RECOVERABLE> return type of #persist() + */ +public abstract class CommitterFromPersistRecoverableFsDataOutputStream< + RESUME_RECOVERABLE extends RecoverableWriter.ResumeRecoverable> + extends RecoverableFsDataOutputStream { + + /** @see RecoverableFsDataOutputStream#persist() */ + @Override + public abstract RESUME_RECOVERABLE persist() throws IOException; + + /** + * @see RecoverableFsDataOutputStream#closeForCommit() + * @param recoverable a resume recoverable to create the committer from. Typically the parameter + * is the return value of {@link #persist()}. + * @return the committer created from recoverable. + */ + protected abstract Committer createCommitterFromResumeRecoverable( + RESUME_RECOVERABLE recoverable); + + /** + * @see RecoverableFsDataOutputStream#closeForCommit() + * @implNote Calls persist to ensure durability of the written data and creates a committer + * object from the return value of {@link #persist()}. + */ + @Override + public final Committer closeForCommit() throws IOException { + Committer committer = createCommitterFromResumeRecoverable(persist()); + close(); + return committer; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java index cc9c88fc4f4..c273c31960e 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStream.java @@ -19,9 +19,10 @@ package org.apache.flink.core.fs.local; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.CommitterFromPersistRecoverableFsDataOutputStream; import org.apache.flink.core.fs.RecoverableFsDataOutputStream; import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable; -import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable; import java.io.File; import java.io.FileNotFoundException; @@ -40,7 +41,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** A {@link RecoverableFsDataOutputStream} for the {@link LocalFileSystem}. */ @Internal -public class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream { +public class LocalRecoverableFsDataOutputStream + extends CommitterFromPersistRecoverableFsDataOutputStream<LocalRecoverable> { private final File targetFile; @@ -78,6 +80,15 @@ public class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputS this.fos = Channels.newOutputStream(fileChannel); } + @VisibleForTesting + LocalRecoverableFsDataOutputStream( + File targetFile, File tempFile, FileChannel fileChannel, OutputStream fos) { + this.targetFile = checkNotNull(targetFile); + this.tempFile = checkNotNull(tempFile); + this.fileChannel = fileChannel; + this.fos = fos; + } + @Override public void write(int b) throws IOException { fos.write(b); @@ -104,7 +115,7 @@ public class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputS } @Override - public ResumeRecoverable persist() throws IOException { + public LocalRecoverable persist() throws IOException { // we call both flush and sync in order to ensure persistence on mounted // file systems, like NFS, EBS, EFS, ... flush(); @@ -114,10 +125,8 @@ public class LocalRecoverableFsDataOutputStream extends RecoverableFsDataOutputS } @Override - public Committer closeForCommit() throws IOException { - final long pos = getPos(); - close(); - return new LocalCommitter(new LocalRecoverable(targetFile, tempFile, pos)); + protected Committer createCommitterFromResumeRecoverable(LocalRecoverable recoverable) { + return new LocalCommitter(recoverable); } @Override diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/AbstractRecoverableFsDataOutputStreamTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/AbstractRecoverableFsDataOutputStreamTest.java new file mode 100644 index 00000000000..8b7b5c4b669 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/AbstractRecoverableFsDataOutputStreamTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.local; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.util.FileUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +/** Base class for testing implementations of {@link RecoverableFsDataOutputStream}. */ +public abstract class AbstractRecoverableFsDataOutputStreamTest { + + /** Events for methods being called on the stream. */ + public enum Event { + CLOSE, + FLUSH, + SYNC + } + + @TempDir Path tmp; + + /** + * Tests that #closeForCommit leads to a durable write to the temporary file and to target on + * commit. + */ + @Test + public void testDurableWriteOnCommit() throws IOException { + // Setup + final int seed = 4711; + final Random random = new Random(seed); + final byte[] buffer = new byte[4 * 4096]; + final List<LocalRecoverableFsDataOutputStreamTest.Event> testLog = new ArrayList<>(); + final Path target = tmp.resolve("target"); + final Path temp = tmp.resolve("temp"); + + Tuple2<RecoverableFsDataOutputStream, Closeable> testInstance = + createTestInstance(target, temp, testLog); + + // Create test object + final RecoverableFsDataOutputStream testOutStreamInstance = testInstance.f0; + + // Write test data + random.nextBytes(buffer); + testOutStreamInstance.write(buffer); + + // Test closeForCommit + Assertions.assertTrue(testLog.isEmpty()); + RecoverableFsDataOutputStream.Committer committer = testOutStreamInstance.closeForCommit(); + Assertions.assertEquals(getExpectedResult(), testLog); + + testInstance.f1.close(); + Assertions.assertArrayEquals(buffer, FileUtils.readAllBytes(temp)); + + // Test commit + Assertions.assertFalse(target.toFile().exists()); + committer.commit(); + Assertions.assertTrue(target.toFile().exists()); + Assertions.assertArrayEquals(buffer, FileUtils.readAllBytes(target)); + } + + public abstract Tuple2<RecoverableFsDataOutputStream, Closeable> createTestInstance( + Path target, Path temp, List<LocalRecoverableFsDataOutputStreamTest.Event> testLog) + throws IOException; + + public List<Event> getExpectedResult() { + return Arrays.asList( + LocalRecoverableFsDataOutputStreamTest.Event.FLUSH, + LocalRecoverableFsDataOutputStreamTest.Event.SYNC, + LocalRecoverableFsDataOutputStreamTest.Event.CLOSE); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStreamTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStreamTest.java new file mode 100644 index 00000000000..be0e60aa0d4 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/core/fs/local/LocalRecoverableFsDataOutputStreamTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs.local; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; + +import java.io.BufferedOutputStream; +import java.io.Closeable; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.List; + +/** Unit tests for {@link LocalRecoverableFsDataOutputStream}. */ +public class LocalRecoverableFsDataOutputStreamTest + extends AbstractRecoverableFsDataOutputStreamTest { + + @Override + public Tuple2<RecoverableFsDataOutputStream, Closeable> createTestInstance( + Path target, Path temp, List<Event> testLog) throws IOException { + final FileChannel fileChannel = + new TestFileChannel( + FileChannel.open( + temp, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW), + testLog); + + final TestOutputStream fos = + new TestOutputStream( + new BufferedOutputStream(Channels.newOutputStream(fileChannel)), testLog); + + // Create test object + final RecoverableFsDataOutputStream testOutStreamInstance = + new LocalRecoverableFsDataOutputStream( + target.toFile(), temp.toFile(), fileChannel, fos); + + return new Tuple2<>(testOutStreamInstance, fos::actualClose); + } + + private static class TestOutputStream extends FilterOutputStream { + + private final List<Event> events; + + public TestOutputStream(OutputStream out, List<Event> events) { + super(out); + this.events = events; + } + + @Override + public void flush() throws IOException { + super.flush(); + events.add(Event.FLUSH); + } + + @Override + public void close() { + events.add(Event.CLOSE); + // Do nothing on close. + } + + public void actualClose() throws IOException { + super.close(); + } + } + + static class TestFileChannel extends FileChannel { + + final FileChannel delegate; + + private final List<Event> events; + + TestFileChannel(FileChannel delegate, List<Event> events) { + this.delegate = delegate; + this.events = events; + } + + @Override + public int read(ByteBuffer dst) throws IOException { + return delegate.read(dst); + } + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + return delegate.read(dsts, offset, length); + } + + @Override + public int write(ByteBuffer src) throws IOException { + return delegate.write(src); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + return delegate.write(srcs, offset, length); + } + + @Override + public long position() throws IOException { + return delegate.position(); + } + + @Override + public FileChannel position(long newPosition) throws IOException { + return delegate.position(newPosition); + } + + @Override + public long size() throws IOException { + return delegate.size(); + } + + @Override + public FileChannel truncate(long size) throws IOException { + return delegate.truncate(size); + } + + @Override + public void force(boolean metaData) throws IOException { + delegate.force(metaData); + events.add(Event.SYNC); + } + + @Override + public long transferTo(long position, long count, WritableByteChannel target) + throws IOException { + return delegate.transferTo(position, count, target); + } + + @Override + public long transferFrom(ReadableByteChannel src, long position, long count) + throws IOException { + return delegate.transferFrom(src, position, count); + } + + @Override + public int read(ByteBuffer dst, long position) throws IOException { + return delegate.read(dst, position); + } + + @Override + public int write(ByteBuffer src, long position) throws IOException { + return delegate.write(src, position); + } + + @Override + public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException { + return delegate.map(mode, position, size); + } + + @Override + public FileLock lock(long position, long size, boolean shared) throws IOException { + return delegate.lock(position, size, shared); + } + + @Override + public FileLock tryLock(long position, long size, boolean shared) throws IOException { + return delegate.tryLock(position, size, shared); + } + + @Override + protected void implCloseChannel() {} + } +} diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java index 1236c3cd540..232bc6a2cb7 100644 --- a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java +++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStream.java @@ -63,6 +63,16 @@ public class AzureBlobFsRecoverableDataOutputStream this.out = fs.create(tempFile); } + /** Use only for testing! */ + @VisibleForTesting + AzureBlobFsRecoverableDataOutputStream( + FileSystem fs, Path targetFile, Path tempFile, FSDataOutputStream out) { + this.fs = checkNotNull(fs); + this.targetFile = checkNotNull(targetFile); + this.tempFile = checkNotNull(tempFile); + this.out = out; + } + AzureBlobFsRecoverableDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException { this.fs = checkNotNull(fs); @@ -215,11 +225,8 @@ public class AzureBlobFsRecoverableDataOutputStream } @Override - public Committer closeForCommit() throws IOException { - final long pos = getPos(); - close(); - return new AzureBlobFsRecoverableDataOutputStream.ABFSCommitter( - fs, createHadoopFsRecoverable(pos)); + protected Committer createCommitterFromResumeRecoverable(HadoopFsRecoverable recoverable) { + return new ABFSCommitter(fs, recoverable); } // ------------------------------------------------------------------------ diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStreamTest.java b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStreamTest.java new file mode 100644 index 00000000000..3c451d793e9 --- /dev/null +++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobFsRecoverableDataOutputStreamTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.azurefs; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.local.AbstractRecoverableFsDataOutputStreamTest; +import org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStreamTest; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; + +import java.io.BufferedOutputStream; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; + +/** Unit tests for {@link AzureBlobFsRecoverableDataOutputStream}. */ +public class AzureBlobFsRecoverableDataOutputStreamTest + extends AbstractRecoverableFsDataOutputStreamTest { + + @Override + public Tuple2<RecoverableFsDataOutputStream, Closeable> createTestInstance( + Path target, Path temp, List<Event> testLog) throws IOException { + final TestFSDataOutputStream fos = + new TestFSDataOutputStream( + new BufferedOutputStream(Files.newOutputStream(temp)), testLog); + + final AzureBlobFsRecoverableDataOutputStream testOutStreamInstance = + new AzureBlobFsRecoverableDataOutputStream( + FileSystem.getLocal(new Configuration()), + new org.apache.hadoop.fs.Path(target.toUri()), + new org.apache.hadoop.fs.Path(temp.toUri()), + fos); + + return new Tuple2<>(testOutStreamInstance, fos::actualClose); + } + + private static class TestFSDataOutputStream extends FSDataOutputStream { + + private final List<Event> events; + + public TestFSDataOutputStream(OutputStream out, List<Event> events) throws IOException { + super(out, new FileSystem.Statistics("test_stats")); + this.events = events; + } + + @Override + public void hflush() throws IOException { + super.hflush(); + events.add(Event.FLUSH); + } + + @Override + public void hsync() throws IOException { + super.hsync(); + events.add(Event.SYNC); + } + + @Override + public void close() { + events.add(Event.CLOSE); + // Do nothing on close. + } + + public void actualClose() throws IOException { + super.close(); + } + } + + @Override + public List<Event> getExpectedResult() { + // Seems that Azure does not require flush before sync, see + // https://github.com/apache/flink/pull/21508#discussion_r1064351162 + return Arrays.asList( + LocalRecoverableFsDataOutputStreamTest.Event.SYNC, + LocalRecoverableFsDataOutputStreamTest.Event.CLOSE); + } +} diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/BaseHadoopFsRecoverableFsDataOutputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/BaseHadoopFsRecoverableFsDataOutputStream.java index 93628a349a8..7a8563f15a4 100644 --- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/BaseHadoopFsRecoverableFsDataOutputStream.java +++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/BaseHadoopFsRecoverableFsDataOutputStream.java @@ -19,8 +19,7 @@ package org.apache.flink.runtime.fs.hdfs; import org.apache.flink.annotation.Internal; -import org.apache.flink.core.fs.RecoverableFsDataOutputStream; -import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.core.fs.CommitterFromPersistRecoverableFsDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -31,7 +30,7 @@ import java.io.IOException; /** Base class for ABFS and Hadoop recoverable stream. */ @Internal public abstract class BaseHadoopFsRecoverableFsDataOutputStream - extends RecoverableFsDataOutputStream { + extends CommitterFromPersistRecoverableFsDataOutputStream<HadoopFsRecoverable> { protected FileSystem fs; @@ -70,18 +69,15 @@ public abstract class BaseHadoopFsRecoverableFsDataOutputStream } @Override - public RecoverableWriter.ResumeRecoverable persist() throws IOException { + public HadoopFsRecoverable persist() throws IOException { sync(); return createHadoopFsRecoverable(getPos()); } - public HadoopFsRecoverable createHadoopFsRecoverable(long pos) throws IOException { + public HadoopFsRecoverable createHadoopFsRecoverable(long pos) { return new HadoopFsRecoverable(targetFile, tempFile, pos + initialFileSize); } - @Override - public abstract Committer closeForCommit() throws IOException; - @Override public void close() throws IOException { out.close(); diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java index bf5f19a799e..8f695ea777e 100644 --- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java +++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.fs.hdfs; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Deadline; import org.apache.flink.core.fs.RecoverableFsDataOutputStream; import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable; @@ -28,6 +29,7 @@ import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -66,6 +68,15 @@ class HadoopRecoverableFsDataOutputStream extends BaseHadoopFsRecoverableFsDataO this.out = fs.create(tempFile); } + @VisibleForTesting + HadoopRecoverableFsDataOutputStream( + FileSystem fs, Path targetFile, Path tempFile, FSDataOutputStream out) { + this.fs = checkNotNull(fs); + this.targetFile = checkNotNull(targetFile); + this.tempFile = checkNotNull(tempFile); + this.out = out; + } + HadoopRecoverableFsDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException { @@ -95,10 +106,8 @@ class HadoopRecoverableFsDataOutputStream extends BaseHadoopFsRecoverableFsDataO } @Override - public Committer closeForCommit() throws IOException { - final long pos = getPos(); - close(); - return new HadoopFsCommitter(fs, createHadoopFsRecoverable(pos)); + protected Committer createCommitterFromResumeRecoverable(HadoopFsRecoverable recoverable) { + return new HadoopFsCommitter(fs, recoverable); } // ------------------------------------------------------------------------ diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStreamTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStreamTest.java new file mode 100644 index 00000000000..bccce5be843 --- /dev/null +++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStreamTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.fs.hdfs; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.local.AbstractRecoverableFsDataOutputStreamTest; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; + +import java.io.BufferedOutputStream; +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +/** Unit tests for {@link HadoopRecoverableFsDataOutputStream}. */ +public class HadoopRecoverableFsDataOutputStreamTest + extends AbstractRecoverableFsDataOutputStreamTest { + + @Override + public Tuple2<RecoverableFsDataOutputStream, Closeable> createTestInstance( + Path target, Path temp, List<Event> testLog) throws IOException { + final TestFSDataOutputStream fos = + new TestFSDataOutputStream( + new BufferedOutputStream(Files.newOutputStream(temp)), testLog); + + final HadoopRecoverableFsDataOutputStream testOutStreamInstance = + new HadoopRecoverableFsDataOutputStream( + FileSystem.getLocal(new Configuration()), + new org.apache.hadoop.fs.Path(target.toUri()), + new org.apache.hadoop.fs.Path(temp.toUri()), + fos); + + return new Tuple2<>(testOutStreamInstance, fos::actualClose); + } + + private static class TestFSDataOutputStream extends FSDataOutputStream { + + private final List<Event> events; + + public TestFSDataOutputStream(OutputStream out, List<Event> events) throws IOException { + super(out, new FileSystem.Statistics("test_stats")); + this.events = events; + } + + @Override + public void hflush() throws IOException { + super.hflush(); + events.add(Event.FLUSH); + } + + @Override + public void hsync() throws IOException { + super.hsync(); + events.add(Event.SYNC); + } + + @Override + public void close() { + events.add(Event.CLOSE); + // Do nothing on close. + } + + public void actualClose() throws IOException { + super.close(); + } + } +}