This is an automated email from the ASF dual-hosted git repository.

amanin pushed a commit to branch refactor/strict_storage_connector
in repository https://gitbox.apache.org/repos/asf/sis.git

commit 1feae11b8f0f36612ff0706e4749326193f9c161
Author: Alexis Manin <[email protected]>
AuthorDate: Thu Mar 11 15:36:24 2021 +0100

    refactor(Storage): improve strict storage connector and fully port NetCDF 
provider to use it.
    
    Add support for Channel Data Input and SeekableByteChannel
    
    Improve control evolutivity by splitting its logic in a separate API
    
    Allow access to underlying storage connector options
    
    Minor indentation improvements.
---
 .../sis/storage/netcdf/NetcdfStoreProvider.java    |  41 ++--
 .../apache/sis/storage/StrictStorageConnector.java | 240 +++++++++++++++++----
 2 files changed, 223 insertions(+), 58 deletions(-)

diff --git 
a/storage/sis-netcdf/src/main/java/org/apache/sis/storage/netcdf/NetcdfStoreProvider.java
 
b/storage/sis-netcdf/src/main/java/org/apache/sis/storage/netcdf/NetcdfStoreProvider.java
index 0e1dbc1..7e8bea4 100644
--- 
a/storage/sis-netcdf/src/main/java/org/apache/sis/storage/netcdf/NetcdfStoreProvider.java
+++ 
b/storage/sis-netcdf/src/main/java/org/apache/sis/storage/netcdf/NetcdfStoreProvider.java
@@ -18,10 +18,11 @@ package org.apache.sis.storage.netcdf;
 
 import java.util.logging.Level;
 import java.util.logging.LogRecord;
+import java.util.Optional;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.file.NoSuchFileException;
+import java.nio.charset.Charset;
 import java.lang.reflect.Method;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
@@ -290,34 +291,38 @@ public class NetcdfStoreProvider extends 
DataStoreProvider {
      * @throws IOException if an error occurred while opening the netCDF file.
      * @throws DataStoreException if a logical error (other than I/O) occurred.
      */
-    static Decoder decoder(final StoreListeners listeners, final 
StorageConnector connector)
+    static Decoder decoder(final StoreListeners listeners, final 
StorageConnector connector) throws IOException, DataStoreException {
+        return decoder(listeners, new StrictStorageConnector(connector));
+    }
+
+    static Decoder decoder(final StoreListeners listeners, final 
StrictStorageConnector connector)
             throws IOException, DataStoreException
     {
         final GeometryLibrary geomlib = 
connector.getOption(OptionKey.GEOMETRY_LIBRARY);
         Decoder decoder;
-        Object keepOpen;
-        final ChannelDataInput input = 
connector.getStorageAs(ChannelDataInput.class);
-        if (input != null) try {
-            decoder = new ChannelDecoder(input, 
connector.getOption(OptionKey.ENCODING), geomlib, listeners);
-            keepOpen = input;
+        Class<?> viewType;
+        try {
+            final Charset charset = connector.getOption(OptionKey.ENCODING);
+            decoder = connector.useAs(ChannelDataInput.class, input -> new 
ChannelDecoder(input, charset, geomlib, listeners));
+            viewType = ChannelDataInput.class;
+        } catch (UnsupportedStorageException e) {
+            final Object storage = connector.unsafe().getStorage();
+            decoder = createByReflection(storage, true, geomlib, listeners);
+            viewType = storage.getClass();
         } catch (DataStoreException | ArithmeticException e) {
-            final String path = connector.getStorageAs(String.class);
-            if (path != null) try {
-                decoder = createByReflection(path, false, geomlib, listeners);
-                keepOpen = path;
-            } catch (IOException | DataStoreException s) {
+            Optional<String> path = connector.getPathAsString();
+            if (!path.isPresent()) throw e;
+            try {
+                decoder = createByReflection(path.get(), false, geomlib, 
listeners);
+                viewType = String.class;
+            } catch (Exception s) {
                 e.addSuppressed(s);
                 throw e;
-            } else {
-                throw e;
             }
-        } else {
-            keepOpen = connector.getStorage();
-            decoder = createByReflection(keepOpen, true, geomlib, listeners);
         }
 
         if (decoder != null) {
-            connector.closeAllExcept(keepOpen);
+            connector.commit(viewType);
             decoder.applyOtherConventions();
         }
         return decoder;
diff --git 
a/storage/sis-storage/src/main/java/org/apache/sis/storage/StrictStorageConnector.java
 
b/storage/sis-storage/src/main/java/org/apache/sis/storage/StrictStorageConnector.java
index 906c0f7..af6be34 100644
--- 
a/storage/sis-storage/src/main/java/org/apache/sis/storage/StrictStorageConnector.java
+++ 
b/storage/sis-storage/src/main/java/org/apache/sis/storage/StrictStorageConnector.java
@@ -5,11 +5,15 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
 import java.nio.file.Path;
 import java.util.Optional;
 import java.util.concurrent.Callable;
+import java.util.function.Function;
 import javax.imageio.stream.ImageInputStream;
 import javax.sql.DataSource;
+import org.apache.sis.internal.storage.io.ChannelDataInput;
+import org.apache.sis.setup.OptionKey;
 import org.apache.sis.util.UnconvertibleObjectException;
 import org.apache.sis.util.collection.BackingStoreException;
 
@@ -102,37 +106,15 @@ public class StrictStorageConnector implements 
AutoCloseable {
      * @throws IOException If given operator throws IOException on execution.
      * @throws DataStoreException If an error occurs while fetching queried 
storage.
      */
-    public <T> T useAsBuffer(StorageOperatingFunction<ByteBuffer, T> operator) 
throws DataStoreException, IOException {
-        return doUnderControl(() -> {
-            final ByteBuffer buffer = getOrFail(ByteBuffer.class);
-            try ( Closeable rewindOnceDone = buffer::rewind ) {
-                return operator.apply(buffer);
-            }
-        });
+    public <T> T useAsBuffer(StorageOperatingFunction<? super ByteBuffer, T> 
operator) throws DataStoreException, IOException {
+        return use(ByteBuffer.class, new ByteBufferControl(), operator);
     }
 
     /**
      * Specialization of {@link #useAs(Class, StorageOperatingFunction)} for 
{@link ImageInputStream ImageIO API}.
      */
-    public <T> T 
useAsImageInputStream(StorageOperatingFunction<ImageInputStream, T> operator) 
throws IOException, DataStoreException {
-        return doUnderControl(() -> {
-            ImageInputStream stream = getOrFail(ImageInputStream.class);
-            final long positionCtrl = stream.getStreamPosition();
-            stream.mark();
-            T result;
-            try ( Closeable rewindOnceDone = stream::reset ) {
-                result = operator.apply(stream);
-            }
-            final long rewindPosition = stream.getStreamPosition();
-            if (rewindPosition != positionCtrl) {
-                concurrentFlag = -1; // mark this connector as closed/not 
valid anymore
-                throw new StorageControlException(String.format(
-                        "Operator has messed with stream marks. Rewind should 
have positioned at %d, but ended at %d",
-                        positionCtrl, rewindPosition
-                ), ImageInputStream.class);
-            }
-            return result;
-        });
+    public <T> T useAsImageInputStream(StorageOperatingFunction<? super 
ImageInputStream, T> operator) throws IOException, DataStoreException {
+        return use(ImageInputStream.class, new 
StreamControl<>(ImageInputStream.class, IISAdapter::new), operator);
     }
 
     /**
@@ -161,18 +143,46 @@ public class StrictStorageConnector implements 
AutoCloseable {
      * @throws DataStoreException If an error occurs while fetching queried 
storage.
      */
     public <S, T> T useAs(Class<S> storageType, StorageOperatingFunction<? 
super S, ? extends T> operator) throws IOException, DataStoreException {
-        if (ByteBuffer.class.isAssignableFrom(storageType)) return 
useAsBuffer((StorageOperatingFunction<ByteBuffer, T>) operator);
-        else if (ImageInputStream.class.isAssignableFrom(storageType)) return 
useAsImageInputStream((StorageOperatingFunction<ImageInputStream, T>) operator);
-        else if (URI.class.isAssignableFrom(storageType)) return 
((StorageOperatingFunction<URI, T>) operator).apply(
-                getURI().orElseThrow(() -> new 
UnsupportedStorageException("Cannot acquire an URI"))
-        );
-        else if (Path.class.isAssignableFrom(storageType)) return 
((StorageOperatingFunction<Path, T>) operator).apply(
-                getPath().orElseThrow(() -> new 
UnsupportedStorageException("Cannot acquire a path"))
-        );
-        else if (File.class.isAssignableFrom(storageType)) return 
((StorageOperatingFunction<File, T>) operator).apply(
-                getPath().map(p -> p.toFile()).orElseThrow(() -> new 
UnsupportedStorageException("Cannot acquire a file"))
-        );
-        else throw new UnsupportedStorageException("Queried storage type is 
not supported yet: "+storageType);
+        if (ByteBuffer.class.isAssignableFrom(storageType)) {
+            return useAsBuffer((StorageOperatingFunction<? super ByteBuffer, 
T>) operator);
+        } else if (ImageInputStream.class.isAssignableFrom(storageType)) {
+            return use(storageType, new StreamControl<>(storageType, stream -> 
new IISAdapter((ImageInputStream) stream)), operator);
+        } else if (ChannelDataInput.class.isAssignableFrom(storageType)) {
+            return use(storageType, new StreamControl<>(storageType, stream -> 
new CDIAdapter((ChannelDataInput) stream)), operator);
+        } else if (SeekableByteChannel.class.isAssignableFrom(storageType)) {
+            return use(storageType, new StreamControl<>(storageType, stream -> 
new SBCAdapter((SeekableByteChannel) stream)), operator);
+        } else if (URI.class.isAssignableFrom(storageType)) {
+            final URI input = getURI().orElseThrow(() -> new 
UnsupportedStorageException("Cannot acquire an URI"));
+            return ((StorageOperatingFunction<? super URI, T>) 
operator).apply(input);
+        } else if (Path.class.isAssignableFrom(storageType)) {
+            final Path input = getPath().orElseThrow(() -> new 
UnsupportedStorageException("Cannot acquire a path"));
+            return ((StorageOperatingFunction<? super Path, T>) 
operator).apply(input);
+        }
+        else if (File.class.isAssignableFrom(storageType)) {
+            final Path input = getPath().orElseThrow(() -> new 
UnsupportedStorageException("Cannot acquire a path"));
+            final File inputFile;
+            try {
+                inputFile = input.toFile();
+            } catch (UnsupportedOperationException e) {
+                throw new UnsupportedStorageException("Storage is not a local 
path (not a file on default file-system).", e);
+            }
+            return ((StorageOperatingFunction<? super File, T>) 
operator).apply(inputFile);
+        }
+
+        throw new UnsupportedStorageException("Queried storage type is not 
supported yet: "+storageType);
+    }
+
+    private <I, O> O use(Class<I> storageType, ControlStrategy<I> control, 
StorageOperatingFunction<? super I, ? extends O> action) throws IOException, 
DataStoreException {
+        return doUnderControl(() -> {
+            final I rawInput = getOrFail(storageType);
+            final ControlOperator<I> op = control.init(rawInput);
+            try ( Closeable controlAfterUse = op::postControl ) {
+                return action.apply(op.getStorage());
+            } catch (StorageControlException e) {
+                concurrentFlag = -1;
+                throw e;
+            }
+        });
     }
 
     /**
@@ -187,7 +197,7 @@ public class StrictStorageConnector implements 
AutoCloseable {
      * @throws DataStoreException Same reasons as for IOException + can happen 
if queried storage is of unsupported type.
      * @throws IllegalStateException If this connector is already closed.
      */
-    protected <V> V doUnderControl(StorageCallable<V> operator) throws 
IOException, DataStoreException {
+    protected <V> V doUnderControl(StorageCallable<? extends V> operator) 
throws IOException, DataStoreException {
         if (concurrentFlag < 0) throw new IllegalStateException("...");
         if (concurrentFlag != 0) throw new ConcurrentReadException("...");
         concurrentFlag++;
@@ -252,6 +262,10 @@ public class StrictStorageConnector implements 
AutoCloseable {
         storage.closeAllExcept(committedStorage);
     }
 
+    public <T> T getOption(OptionKey<T> key) {
+        return storage.getOption(key);
+    }
+
     /**
      *
      * @return A connector that does not provides control over resource usage. 
It does not automatically rewind
@@ -271,7 +285,10 @@ public class StrictStorageConnector implements 
AutoCloseable {
         O apply(I storage) throws IOException, DataStoreException;
     }
 
-    public class StorageControlException extends RuntimeException {
+    /**
+     * indicates a failure over resource health control (rewind after use, 
etc.);
+     */
+    public static class StorageControlException extends RuntimeException {
         public final Class<?> storageType;
 
         public StorageControlException(Class<?> storageType) {
@@ -291,4 +308,147 @@ public class StrictStorageConnector implements 
AutoCloseable {
             this.storageType = storageType;
         }
     }
+
+    interface ControlStrategy<S> {
+        Class<S> getType();
+
+        ControlOperator<S> init(S storage) throws IOException;
+    }
+
+    interface ControlOperator<S> {
+        S getStorage();
+        void postControl() throws StorageControlException, IOException;
+    }
+
+    private static class ByteBufferControl implements 
ControlStrategy<ByteBuffer> {
+
+        @Override
+        public Class<ByteBuffer> getType() {
+            return ByteBuffer.class;
+        }
+
+        @Override
+        public ControlOperator<ByteBuffer> init(ByteBuffer storage) {
+            final ByteBuffer freezed = storage.asReadOnlyBuffer();
+            return new ControlOperator<ByteBuffer>() {
+                @Override
+                public ByteBuffer getStorage() {
+                    return freezed;
+                }
+
+                @Override
+                public void postControl() throws StorageControlException {
+                    storage.rewind();
+                }
+            };
+        }
+    }
+
+    private static class StreamControl<T> implements ControlStrategy<T> {
+
+        private final Class<T> viewType;
+        private final Function<T, ResetableStream> adapter;
+
+        public StreamControl(Class<T> viewType, Function<T, ResetableStream> 
adapter) {
+            this.viewType = viewType;
+            this.adapter = adapter;
+        }
+
+        @Override
+        public Class<T> getType() {
+            return viewType;
+        }
+
+        @Override
+        public ControlOperator<T> init(T storage) throws IOException {
+            final ResetableStream adaptedStorage = adapter.apply(storage);
+            final long positionCtrl = adaptedStorage.mark();
+
+            return new ControlOperator<T>() {
+                @Override
+                public T getStorage() {
+                    return storage;
+                }
+
+                @Override
+                public void postControl() throws StorageControlException, 
IOException {
+                    adaptedStorage.reset();
+
+                    final long rewindPosition = adaptedStorage.mark();
+                    if (rewindPosition != positionCtrl) {
+                        throw new StorageControlException(String.format(
+                                "Operator has messed with stream marks. Rewind 
should have positioned at %d, but ended at %d",
+                                positionCtrl, rewindPosition
+                        ), viewType);
+                    }
+                }
+            };
+        }
+    }
+
+    private interface ResetableStream {
+        long mark() throws IOException;
+        void reset() throws IOException;
+    }
+
+    private static class IISAdapter implements ResetableStream {
+        private final ImageInputStream source;
+
+        IISAdapter(ImageInputStream source) {
+            this.source = source;
+        }
+
+        @Override
+        public long mark() throws IOException {
+            source.mark();
+            return source.getStreamPosition();
+        }
+
+        @Override
+        public void reset() throws IOException {
+            source.reset();
+        }
+    }
+
+    private static class CDIAdapter implements ResetableStream {
+        private final ChannelDataInput source;
+
+        CDIAdapter(ChannelDataInput source) {
+            this.source = source;
+        }
+
+        @Override
+        public long mark() throws IOException {
+            source.mark();
+            return source.getStreamPosition();
+        }
+
+        @Override
+        public void reset() throws IOException {
+            source.reset();
+        }
+    }
+
+    private static class SBCAdapter implements ResetableStream {
+        final SeekableByteChannel source;
+
+        long mark = -1;
+
+        SBCAdapter(SeekableByteChannel source) {
+            this.source = source;
+        }
+
+        @Override
+        public long mark() throws IOException {
+            final long position = source.position();
+            if (mark >= 0 && position != mark) throw new 
IllegalStateException("This stream is already marked, cannot add one.");
+            this.mark = position;
+            return mark;
+        }
+
+        @Override
+        public void reset() throws IOException {
+            if (mark >= 0) source.position(mark);
+        }
+    }
 }

Reply via email to