Murtadha Hubail has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1942
Change subject: [ASTERIXDB-2040][STO] Reopen closed file due to interrupts ...................................................................... [ASTERIXDB-2040][STO] Reopen closed file due to interrupts - user model changes: no - storage format changes: no - interface changes: no Details: - When a file channel is closed due to interruption, reopen the file to allow new readers to read it. - Add a test case to test the new behavior. Change-Id: I06d7719801282dbf4a4c16ec3d1cdcac2a62e631 --- M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java A hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerTest.java 4 files changed, 105 insertions(+), 15 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/42/1942/1 diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml index 6b8bf73..2f88c41 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml @@ -78,5 +78,10 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> + <dependency> + <groupId>net.jcip</groupId> + <artifactId>jcip-annotations</artifactId> + <version>1.0</version> + </dependency> </dependencies> </project> diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java index f71dcdf..f281b2c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java @@ -28,12 +28,14 @@ import org.apache.hyracks.api.io.IFileHandle; import org.apache.hyracks.api.io.IIOManager; +import net.jcip.annotations.NotThreadSafe; + +@NotThreadSafe public class FileHandle implements IFileHandle { + private final FileReference fileRef; - private RandomAccessFile raf; - - private FileChannel channel; + private String mode; public FileHandle(FileReference fileRef) { this.fileRef = fileRef; @@ -47,7 +49,6 @@ * @throws IOException */ public void open(IIOManager.FileReadWriteMode rwMode, IIOManager.FileSyncMode syncMode) throws IOException { - String mode; if (!fileRef.getFile().exists()) { throw HyracksDataException.create(ErrorCode.FILE_DOES_NOT_EXIST, fileRef.getAbsolutePath()); } @@ -60,15 +61,12 @@ case METADATA_ASYNC_DATA_ASYNC: mode = "rw"; break; - case METADATA_ASYNC_DATA_SYNC: mode = "rwd"; break; - case METADATA_SYNC_DATA_SYNC: mode = "rws"; break; - default: throw new IllegalArgumentException(); } @@ -77,7 +75,7 @@ default: throw new IllegalArgumentException(); } - raf = new RandomAccessFile(fileRef.getFile(), mode); + ensureOpen(); } public void close() throws IOException { @@ -94,10 +92,16 @@ } public FileChannel getFileChannel() { - if (channel == null) { - channel = raf.getChannel(); - } - return channel; + return raf.getChannel(); } + public synchronized void ensureOpen() throws HyracksDataException { + if (raf == null || !raf.getChannel().isOpen()) { + try { + raf = new RandomAccessFile(fileRef.getFile(), mode); + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java index 10c7415..e2b968b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java @@ -22,6 +22,7 @@ import java.io.FilenameFilter; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; import java.nio.channels.FileChannel; import java.nio.file.Path; import java.nio.file.Paths; @@ -203,10 +204,13 @@ n += len; } return n; - } catch (HyracksDataException e) { - throw e; } catch (IOException e) { - throw new HyracksDataException(e); + if (e instanceof ClosedByInterruptException) { + Thread.currentThread().interrupt(); + // re-open the closed channel. The channel will be closed during the typical file lifecycle + ((FileHandle) fHandle).ensureOpen(); + } + throw HyracksDataException.create(e); } } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerTest.java new file mode 100644 index 0000000..90b882c --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.common; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.commons.io.FileUtils; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.io.IIOManager; +import org.apache.hyracks.api.util.IoUtil; +import org.apache.hyracks.control.nc.io.FileHandle; +import org.apache.hyracks.test.support.TestStorageManagerComponentHolder; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; + +public class IOManagerTest { + + private static File testFile; + + @Test + public void interruptedReadTest() throws IOException, InterruptedException { + final IIOManager ioManager = TestStorageManagerComponentHolder.getIOManager(); + final String fileName = String.valueOf(System.currentTimeMillis()); + final FileReference fileRef = ioManager.resolve(fileName); + testFile = fileRef.getFile(); + // create the file + IoUtil.create(fileRef); + // open file handle + final FileHandle fileHandle = (FileHandle) ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE, + IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC); + int theOnlyOne = 1; + // write integer into the file + final ByteBuffer writeBuffer = ByteBuffer.allocate(Integer.BYTES).putInt(theOnlyOne); + writeBuffer.flip(); + ioManager.syncWrite(fileHandle, 0, writeBuffer); + + final ByteBuffer readBuffer = ByteBuffer.allocate(Integer.BYTES); + Thread interruptedReader = new Thread(() -> { + try { + Thread.currentThread().interrupt(); + // The file handle will be closed by ClosedByInterruptException + ioManager.syncRead(fileHandle, 0, readBuffer); + } catch (Exception e) { + e.printStackTrace(); + } + }); + interruptedReader.start(); + interruptedReader.join(); + // we should still be able to read from the file handle + ioManager.syncRead(fileHandle, 0, readBuffer); + Assert.assertEquals(theOnlyOne, readBuffer.getInt(0)); + } + + @AfterClass + public static void cleanup() throws Exception { + FileUtils.deleteQuietly(testFile); + } +} -- To view, visit https://asterix-gerrit.ics.uci.edu/1942 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I06d7719801282dbf4a4c16ec3d1cdcac2a62e631 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>