This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ba2b55df207 [FLINK-30561][state/changelog] fix changelog local cache 
file not found
ba2b55df207 is described below

commit ba2b55df207fb79ad776eaf64ec8a6c1ab27bac9
Author: wangfeifan <zoltar9...@163.com>
AuthorDate: Mon Jan 16 11:15:39 2023 +0800

    [FLINK-30561][state/changelog] fix changelog local cache file not found
---
 flink-dstl/flink-dstl-dfs/pom.xml                  |   8 ++
 .../fs/ChangelogStreamHandleReaderWithCache.java   |  35 ++++---
 .../ChangelogStreamHandleReaderWithCacheTest.java  | 115 +++++++++++++++++++++
 3 files changed, 143 insertions(+), 15 deletions(-)

diff --git a/flink-dstl/flink-dstl-dfs/pom.xml 
b/flink-dstl/flink-dstl-dfs/pom.xml
index 2854687103e..2e07089ceb2 100644
--- a/flink-dstl/flink-dstl-dfs/pom.xml
+++ b/flink-dstl/flink-dstl-dfs/pom.xml
@@ -58,6 +58,14 @@ under the License.
 
         <!-- test dependencies-->
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-runtime</artifactId>
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java
index 1501cd7482f..70f2f66ef35 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCache.java
@@ -161,23 +161,28 @@ class ChangelogStreamHandleReaderWithCache implements 
ChangelogStreamHandleReade
 
     private DataInputStream wrapStream(Path dfsPath, FileInputStream fin) {
         return new DataInputStream(new BufferedInputStream(fin)) {
+            private boolean closed = false;
+
             @Override
             public void close() throws IOException {
-                try {
-                    super.close();
-                } finally {
-                    cache.computeIfPresent(
-                            dfsPath,
-                            (key, value) -> {
-                                value.release();
-                                if (value.getReferenceCounter() == 
NO_USING_REF_COUNT) {
-                                    cacheCleanScheduler.schedule(
-                                            () -> cleanCacheFile(dfsPath),
-                                            cacheIdleMillis,
-                                            TimeUnit.MILLISECONDS);
-                                }
-                                return value;
-                            });
+                if (!closed) {
+                    closed = true;
+                    try {
+                        super.close();
+                    } finally {
+                        cache.computeIfPresent(
+                                dfsPath,
+                                (key, value) -> {
+                                    value.release();
+                                    if (value.getReferenceCounter() == 
NO_USING_REF_COUNT) {
+                                        cacheCleanScheduler.schedule(
+                                                () -> cleanCacheFile(dfsPath),
+                                                cacheIdleMillis,
+                                                TimeUnit.MILLISECONDS);
+                                    }
+                                    return value;
+                                });
+                    }
                 }
             }
         };
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCacheTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCacheTest.java
new file mode 100644
index 00000000000..76a9c891f1b
--- /dev/null
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStreamHandleReaderWithCacheTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.util.IOUtils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.UUID;
+
+import static java.util.Collections.singletonMap;
+import static 
org.apache.flink.changelog.fs.FsStateChangelogOptions.CACHE_IDLE_TIMEOUT;
+
+/** {@link ChangelogStreamHandleReaderWithCache} test. */
+class ChangelogStreamHandleReaderWithCacheTest {
+
+    @TempDir java.nio.file.Path tempFolder;
+
+    @Test
+    void testCloseStreamTwice() throws Exception {
+        String tempFolderPath = tempFolder.toUri().getPath();
+
+        registerFileSystem(
+                new LocalFileSystem() {
+                    @Override
+                    public boolean isDistributedFS() {
+                        return true;
+                    }
+                },
+                tempFolder.toUri().getScheme());
+
+        byte[] data = {0x00}; // not compressed, empty data
+        Path handlePath = new Path(tempFolderPath, 
UUID.randomUUID().toString());
+        FileStateHandle stateHandle = prepareFileStateHandle(handlePath, data);
+
+        Configuration configuration = new Configuration();
+        configuration.set(CACHE_IDLE_TIMEOUT, Duration.ofDays(365)); // cache 
forever
+        configuration.set(CoreOptions.TMP_DIRS, tempFolderPath);
+
+        try (ChangelogStreamHandleReaderWithCache reader =
+                new ChangelogStreamHandleReaderWithCache(configuration)) {
+
+            DataInputStream inputStream = reader.openAndSeek(stateHandle, 0L);
+
+            inputStream.close();
+            inputStream.close(); // close twice
+
+            reader.openAndSeek(stateHandle, 0L); // should not throw 
FileNotFoundException
+        }
+    }
+
+    private FileStateHandle prepareFileStateHandle(Path path, byte[] data) 
throws IOException {
+        try (InputStream inputStream = new ByteArrayInputStream(data);
+                OutputStream outputStream =
+                        path.getFileSystem().create(path, 
FileSystem.WriteMode.OVERWRITE)) {
+
+            IOUtils.copyBytes(inputStream, outputStream);
+        }
+        return new FileStateHandle(path, data.length);
+    }
+
+    private static void registerFileSystem(FileSystem fs, String scheme) {
+        FileSystem.initialize(
+                new Configuration(),
+                new TestingPluginManager(
+                        singletonMap(
+                                FileSystemFactory.class,
+                                Collections.singleton(
+                                                new FileSystemFactory() {
+                                                    @Override
+                                                    public FileSystem 
create(URI fsUri) {
+                                                        return fs;
+                                                    }
+
+                                                    @Override
+                                                    public String getScheme() {
+                                                        return scheme;
+                                                    }
+                                                })
+                                        .iterator())));
+    }
+}

Reply via email to