This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new ba2b55df207 [FLINK-30561][state/changelog] fix changelog local cache file not found ba2b55df207 is described below commit ba2b55df207fb79ad776eaf64ec8a6c1ab27bac9 Author: wangfeifan <zoltar9...@163.com> AuthorDate: Mon Jan 16 11:15:39 2023 +0800 [FLINK-30561][state/changelog] fix changelog local cache file not found --- flink-dstl/flink-dstl-dfs/pom.xml | 8 ++ .../fs/ChangelogStreamHandleReaderWithCache.java | 35 ++++--- .../ChangelogStreamHandleReaderWithCacheTest.java | 115 +++++++++++++++++++++ 3 files changed, 143 insertions(+), 15 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/pom.xml b/flink-dstl/flink-dstl-dfs/pom.xml index 2854687103e..2e07089ceb2 100644 --- a/flink-dstl/flink-dstl-dfs/pom.xml +++ b/flink-dstl/flink-dstl-dfs/pom.xml @@ -58,6 +58,14 @@ under the License. <!-- test dependencies--> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime</artifactId> diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java index 1501cd7482f..70f2f66ef35 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java @@ -161,23 +161,28 @@ class ChangelogStreamHandleReaderWithCache implements ChangelogStreamHandleReade private DataInputStream wrapStream(Path dfsPath, FileInputStream fin) { return new DataInputStream(new BufferedInputStream(fin)) { + private boolean closed = false; + @Override public void close() throws IOException { - try { - super.close(); - } finally { - cache.computeIfPresent( - dfsPath, - (key, value) -> { - value.release(); - if (value.getReferenceCounter() == NO_USING_REF_COUNT) { - cacheCleanScheduler.schedule( - () -> cleanCacheFile(dfsPath), - cacheIdleMillis, - TimeUnit.MILLISECONDS); - } - return value; - }); + if (!closed) { + closed = true; + try { + super.close(); + } finally { + cache.computeIfPresent( + dfsPath, + (key, value) -> { + value.release(); + if (value.getReferenceCounter() == NO_USING_REF_COUNT) { + cacheCleanScheduler.schedule( + () -> cleanCacheFile(dfsPath), + cacheIdleMillis, + TimeUnit.MILLISECONDS); + } + return value; + }); + } } } }; diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCacheTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCacheTest.java new file mode 100644 index 00000000000..76a9c891f1b --- /dev/null +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCacheTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.changelog.fs; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.local.LocalFileSystem; +import org.apache.flink.core.plugin.TestingPluginManager; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.util.IOUtils; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.time.Duration; +import java.util.Collections; +import java.util.UUID; + +import static java.util.Collections.singletonMap; +import static org.apache.flink.changelog.fs.FsStateChangelogOptions.CACHE_IDLE_TIMEOUT; + +/** {@link ChangelogStreamHandleReaderWithCache} test. */ +class ChangelogStreamHandleReaderWithCacheTest { + + @TempDir java.nio.file.Path tempFolder; + + @Test + void testCloseStreamTwice() throws Exception { + String tempFolderPath = tempFolder.toUri().getPath(); + + registerFileSystem( + new LocalFileSystem() { + @Override + public boolean isDistributedFS() { + return true; + } + }, + tempFolder.toUri().getScheme()); + + byte[] data = {0x00}; // not compressed, empty data + Path handlePath = new Path(tempFolderPath, UUID.randomUUID().toString()); + FileStateHandle stateHandle = prepareFileStateHandle(handlePath, data); + + Configuration configuration = new Configuration(); + configuration.set(CACHE_IDLE_TIMEOUT, Duration.ofDays(365)); // cache forever + configuration.set(CoreOptions.TMP_DIRS, tempFolderPath); + + try (ChangelogStreamHandleReaderWithCache reader = + new ChangelogStreamHandleReaderWithCache(configuration)) { + + DataInputStream inputStream = reader.openAndSeek(stateHandle, 0L); + + inputStream.close(); + inputStream.close(); // close twice + + reader.openAndSeek(stateHandle, 0L); // should not throw FileNotFoundException + } + } + + private FileStateHandle prepareFileStateHandle(Path path, byte[] data) throws IOException { + try (InputStream inputStream = new ByteArrayInputStream(data); + OutputStream outputStream = + path.getFileSystem().create(path, FileSystem.WriteMode.OVERWRITE)) { + + IOUtils.copyBytes(inputStream, outputStream); + } + return new FileStateHandle(path, data.length); + } + + private static void registerFileSystem(FileSystem fs, String scheme) { + FileSystem.initialize( + new Configuration(), + new TestingPluginManager( + singletonMap( + FileSystemFactory.class, + Collections.singleton( + new FileSystemFactory() { + @Override + public FileSystem create(URI fsUri) { + return fs; + } + + @Override + public String getScheme() { + return scheme; + } + }) + .iterator()))); + } +}