Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1942

Change subject: [ASTERIXDB-2040][STO] Reopen closed file due to interrupts
......................................................................

[ASTERIXDB-2040][STO] Reopen closed file due to interrupts

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- When a file channel is closed due to interruption,
  reopen the file to allow new readers to read it.
- Add a test case to test the new behavior.

Change-Id: I06d7719801282dbf4a4c16ec3d1cdcac2a62e631
---
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
A 
hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerTest.java
4 files changed, 105 insertions(+), 15 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/42/1942/1

diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
index 6b8bf73..2f88c41 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/pom.xml
@@ -78,5 +78,10 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>net.jcip</groupId>
+      <artifactId>jcip-annotations</artifactId>
+      <version>1.0</version>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
index f71dcdf..f281b2c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/FileHandle.java
@@ -28,12 +28,14 @@
 import org.apache.hyracks.api.io.IFileHandle;
 import org.apache.hyracks.api.io.IIOManager;
 
+import net.jcip.annotations.NotThreadSafe;
+
+@NotThreadSafe
 public class FileHandle implements IFileHandle {
+
     private final FileReference fileRef;
-
     private RandomAccessFile raf;
-
-    private FileChannel channel;
+    private String mode;
 
     public FileHandle(FileReference fileRef) {
         this.fileRef = fileRef;
@@ -47,7 +49,6 @@
      * @throws IOException
      */
     public void open(IIOManager.FileReadWriteMode rwMode, 
IIOManager.FileSyncMode syncMode) throws IOException {
-        String mode;
         if (!fileRef.getFile().exists()) {
             throw HyracksDataException.create(ErrorCode.FILE_DOES_NOT_EXIST, 
fileRef.getAbsolutePath());
         }
@@ -60,15 +61,12 @@
                     case METADATA_ASYNC_DATA_ASYNC:
                         mode = "rw";
                         break;
-
                     case METADATA_ASYNC_DATA_SYNC:
                         mode = "rwd";
                         break;
-
                     case METADATA_SYNC_DATA_SYNC:
                         mode = "rws";
                         break;
-
                     default:
                         throw new IllegalArgumentException();
                 }
@@ -77,7 +75,7 @@
             default:
                 throw new IllegalArgumentException();
         }
-        raf = new RandomAccessFile(fileRef.getFile(), mode);
+        ensureOpen();
     }
 
     public void close() throws IOException {
@@ -94,10 +92,16 @@
     }
 
     public FileChannel getFileChannel() {
-        if (channel == null) {
-            channel = raf.getChannel();
-        }
-        return channel;
+        return raf.getChannel();
     }
 
+    public synchronized void ensureOpen() throws HyracksDataException {
+        if (raf == null || !raf.getChannel().isOpen()) {
+            try {
+                raf = new RandomAccessFile(fileRef.getFile(), mode);
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 10c7415..e2b968b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -22,6 +22,7 @@
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.FileChannel;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -203,10 +204,13 @@
                 n += len;
             }
             return n;
-        } catch (HyracksDataException e) {
-            throw e;
         } catch (IOException e) {
-            throw new HyracksDataException(e);
+            if (e instanceof ClosedByInterruptException) {
+                Thread.currentThread().interrupt();
+                // re-open the closed channel. The channel will be closed 
during the typical file lifecycle
+                ((FileHandle) fHandle).ensureOpen();
+            }
+            throw HyracksDataException.create(e);
         }
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerTest.java
 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerTest.java
new file mode 100644
index 0000000..90b882c
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-common-test/src/test/java/org/apache/hyracks/storage/common/IOManagerTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.common;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.control.nc.io.FileHandle;
+import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IOManagerTest {
+
+    private static File testFile;
+
+    @Test
+    public void interruptedReadTest() throws IOException, InterruptedException 
{
+        final IIOManager ioManager = 
TestStorageManagerComponentHolder.getIOManager();
+        final String fileName = String.valueOf(System.currentTimeMillis());
+        final FileReference fileRef = ioManager.resolve(fileName);
+        testFile = fileRef.getFile();
+        // create the file
+        IoUtil.create(fileRef);
+        // open file handle
+        final FileHandle fileHandle = (FileHandle) ioManager.open(fileRef, 
IIOManager.FileReadWriteMode.READ_WRITE,
+                IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
+        int theOnlyOne = 1;
+        // write integer into the file
+        final ByteBuffer writeBuffer = 
ByteBuffer.allocate(Integer.BYTES).putInt(theOnlyOne);
+        writeBuffer.flip();
+        ioManager.syncWrite(fileHandle, 0, writeBuffer);
+
+        final ByteBuffer readBuffer = ByteBuffer.allocate(Integer.BYTES);
+        Thread interruptedReader = new Thread(() -> {
+            try {
+                Thread.currentThread().interrupt();
+                // The file handle will be closed by ClosedByInterruptException
+                ioManager.syncRead(fileHandle, 0, readBuffer);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+        interruptedReader.start();
+        interruptedReader.join();
+        // we should still be able to read from the file handle
+        ioManager.syncRead(fileHandle, 0, readBuffer);
+        Assert.assertEquals(theOnlyOne, readBuffer.getInt(0));
+    }
+
+    @AfterClass
+    public static void cleanup() throws Exception {
+        FileUtils.deleteQuietly(testFile);
+    }
+}

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1942
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I06d7719801282dbf4a4c16ec3d1cdcac2a62e631
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>

Reply via email to