This is an automated email from the ASF dual-hosted git repository. amanin pushed a commit to branch refactor/strict_storage_connector in repository https://gitbox.apache.org/repos/asf/sis.git
commit 1feae11b8f0f36612ff0706e4749326193f9c161 Author: Alexis Manin <[email protected]> AuthorDate: Thu Mar 11 15:36:24 2021 +0100 refactor(Storage): improve strict storage connector and fully port NetCDF provider to use it. Add support for Channel Data Input and SeekableByteChannel Improve control evolutivity by splitting its logic in a separate API Allow access to underlying storage connector options Minor indentation improvements. --- .../sis/storage/netcdf/NetcdfStoreProvider.java | 41 ++-- .../apache/sis/storage/StrictStorageConnector.java | 240 +++++++++++++++++---- 2 files changed, 223 insertions(+), 58 deletions(-) diff --git a/storage/sis-netcdf/src/main/java/org/apache/sis/storage/netcdf/NetcdfStoreProvider.java b/storage/sis-netcdf/src/main/java/org/apache/sis/storage/netcdf/NetcdfStoreProvider.java index 0e1dbc1..7e8bea4 100644 --- a/storage/sis-netcdf/src/main/java/org/apache/sis/storage/netcdf/NetcdfStoreProvider.java +++ b/storage/sis-netcdf/src/main/java/org/apache/sis/storage/netcdf/NetcdfStoreProvider.java @@ -18,10 +18,11 @@ package org.apache.sis.storage.netcdf; import java.util.logging.Level; import java.util.logging.LogRecord; +import java.util.Optional; import java.io.FileNotFoundException; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.file.NoSuchFileException; +import java.nio.charset.Charset; import java.lang.reflect.Method; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -290,34 +291,38 @@ public class NetcdfStoreProvider extends DataStoreProvider { * @throws IOException if an error occurred while opening the netCDF file. * @throws DataStoreException if a logical error (other than I/O) occurred. */ - static Decoder decoder(final StoreListeners listeners, final StorageConnector connector) + static Decoder decoder(final StoreListeners listeners, final StorageConnector connector) throws IOException, DataStoreException { + return decoder(listeners, new StrictStorageConnector(connector)); + } + + static Decoder decoder(final StoreListeners listeners, final StrictStorageConnector connector) throws IOException, DataStoreException { final GeometryLibrary geomlib = connector.getOption(OptionKey.GEOMETRY_LIBRARY); Decoder decoder; - Object keepOpen; - final ChannelDataInput input = connector.getStorageAs(ChannelDataInput.class); - if (input != null) try { - decoder = new ChannelDecoder(input, connector.getOption(OptionKey.ENCODING), geomlib, listeners); - keepOpen = input; + Class<?> viewType; + try { + final Charset charset = connector.getOption(OptionKey.ENCODING); + decoder = connector.useAs(ChannelDataInput.class, input -> new ChannelDecoder(input, charset, geomlib, listeners)); + viewType = ChannelDataInput.class; + } catch (UnsupportedStorageException e) { + final Object storage = connector.unsafe().getStorage(); + decoder = createByReflection(storage, true, geomlib, listeners); + viewType = storage.getClass(); } catch (DataStoreException | ArithmeticException e) { - final String path = connector.getStorageAs(String.class); - if (path != null) try { - decoder = createByReflection(path, false, geomlib, listeners); - keepOpen = path; - } catch (IOException | DataStoreException s) { + Optional<String> path = connector.getPathAsString(); + if (!path.isPresent()) throw e; + try { + decoder = createByReflection(path.get(), false, geomlib, listeners); + viewType = String.class; + } catch (Exception s) { e.addSuppressed(s); throw e; - } else { - throw e; } - } else { - keepOpen = connector.getStorage(); - decoder = createByReflection(keepOpen, true, geomlib, listeners); } if (decoder != null) { - connector.closeAllExcept(keepOpen); + connector.commit(viewType); decoder.applyOtherConventions(); } return decoder; diff --git a/storage/sis-storage/src/main/java/org/apache/sis/storage/StrictStorageConnector.java b/storage/sis-storage/src/main/java/org/apache/sis/storage/StrictStorageConnector.java index 906c0f7..af6be34 100644 --- a/storage/sis-storage/src/main/java/org/apache/sis/storage/StrictStorageConnector.java +++ b/storage/sis-storage/src/main/java/org/apache/sis/storage/StrictStorageConnector.java @@ -5,11 +5,15 @@ import java.io.File; import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; +import java.nio.channels.SeekableByteChannel; import java.nio.file.Path; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.function.Function; import javax.imageio.stream.ImageInputStream; import javax.sql.DataSource; +import org.apache.sis.internal.storage.io.ChannelDataInput; +import org.apache.sis.setup.OptionKey; import org.apache.sis.util.UnconvertibleObjectException; import org.apache.sis.util.collection.BackingStoreException; @@ -102,37 +106,15 @@ public class StrictStorageConnector implements AutoCloseable { * @throws IOException If given operator throws IOException on execution. * @throws DataStoreException If an error occurs while fetching queried storage. */ - public <T> T useAsBuffer(StorageOperatingFunction<ByteBuffer, T> operator) throws DataStoreException, IOException { - return doUnderControl(() -> { - final ByteBuffer buffer = getOrFail(ByteBuffer.class); - try ( Closeable rewindOnceDone = buffer::rewind ) { - return operator.apply(buffer); - } - }); + public <T> T useAsBuffer(StorageOperatingFunction<? super ByteBuffer, T> operator) throws DataStoreException, IOException { + return use(ByteBuffer.class, new ByteBufferControl(), operator); } /** * Specialization of {@link #useAs(Class, StorageOperatingFunction)} for {@link ImageInputStream ImageIO API}. */ - public <T> T useAsImageInputStream(StorageOperatingFunction<ImageInputStream, T> operator) throws IOException, DataStoreException { - return doUnderControl(() -> { - ImageInputStream stream = getOrFail(ImageInputStream.class); - final long positionCtrl = stream.getStreamPosition(); - stream.mark(); - T result; - try ( Closeable rewindOnceDone = stream::reset ) { - result = operator.apply(stream); - } - final long rewindPosition = stream.getStreamPosition(); - if (rewindPosition != positionCtrl) { - concurrentFlag = -1; // mark this connector as closed/not valid anymore - throw new StorageControlException(String.format( - "Operator has messed with stream marks. Rewind should have positioned at %d, but ended at %d", - positionCtrl, rewindPosition - ), ImageInputStream.class); - } - return result; - }); + public <T> T useAsImageInputStream(StorageOperatingFunction<? super ImageInputStream, T> operator) throws IOException, DataStoreException { + return use(ImageInputStream.class, new StreamControl<>(ImageInputStream.class, IISAdapter::new), operator); } /** @@ -161,18 +143,46 @@ public class StrictStorageConnector implements AutoCloseable { * @throws DataStoreException If an error occurs while fetching queried storage. */ public <S, T> T useAs(Class<S> storageType, StorageOperatingFunction<? super S, ? extends T> operator) throws IOException, DataStoreException { - if (ByteBuffer.class.isAssignableFrom(storageType)) return useAsBuffer((StorageOperatingFunction<ByteBuffer, T>) operator); - else if (ImageInputStream.class.isAssignableFrom(storageType)) return useAsImageInputStream((StorageOperatingFunction<ImageInputStream, T>) operator); - else if (URI.class.isAssignableFrom(storageType)) return ((StorageOperatingFunction<URI, T>) operator).apply( - getURI().orElseThrow(() -> new UnsupportedStorageException("Cannot acquire an URI")) - ); - else if (Path.class.isAssignableFrom(storageType)) return ((StorageOperatingFunction<Path, T>) operator).apply( - getPath().orElseThrow(() -> new UnsupportedStorageException("Cannot acquire a path")) - ); - else if (File.class.isAssignableFrom(storageType)) return ((StorageOperatingFunction<File, T>) operator).apply( - getPath().map(p -> p.toFile()).orElseThrow(() -> new UnsupportedStorageException("Cannot acquire a file")) - ); - else throw new UnsupportedStorageException("Queried storage type is not supported yet: "+storageType); + if (ByteBuffer.class.isAssignableFrom(storageType)) { + return useAsBuffer((StorageOperatingFunction<? super ByteBuffer, T>) operator); + } else if (ImageInputStream.class.isAssignableFrom(storageType)) { + return use(storageType, new StreamControl<>(storageType, stream -> new IISAdapter((ImageInputStream) stream)), operator); + } else if (ChannelDataInput.class.isAssignableFrom(storageType)) { + return use(storageType, new StreamControl<>(storageType, stream -> new CDIAdapter((ChannelDataInput) stream)), operator); + } else if (SeekableByteChannel.class.isAssignableFrom(storageType)) { + return use(storageType, new StreamControl<>(storageType, stream -> new SBCAdapter((SeekableByteChannel) stream)), operator); + } else if (URI.class.isAssignableFrom(storageType)) { + final URI input = getURI().orElseThrow(() -> new UnsupportedStorageException("Cannot acquire an URI")); + return ((StorageOperatingFunction<? super URI, T>) operator).apply(input); + } else if (Path.class.isAssignableFrom(storageType)) { + final Path input = getPath().orElseThrow(() -> new UnsupportedStorageException("Cannot acquire a path")); + return ((StorageOperatingFunction<? super Path, T>) operator).apply(input); + } + else if (File.class.isAssignableFrom(storageType)) { + final Path input = getPath().orElseThrow(() -> new UnsupportedStorageException("Cannot acquire a path")); + final File inputFile; + try { + inputFile = input.toFile(); + } catch (UnsupportedOperationException e) { + throw new UnsupportedStorageException("Storage is not a local path (not a file on default file-system).", e); + } + return ((StorageOperatingFunction<? super File, T>) operator).apply(inputFile); + } + + throw new UnsupportedStorageException("Queried storage type is not supported yet: "+storageType); + } + + private <I, O> O use(Class<I> storageType, ControlStrategy<I> control, StorageOperatingFunction<? super I, ? extends O> action) throws IOException, DataStoreException { + return doUnderControl(() -> { + final I rawInput = getOrFail(storageType); + final ControlOperator<I> op = control.init(rawInput); + try ( Closeable controlAfterUse = op::postControl ) { + return action.apply(op.getStorage()); + } catch (StorageControlException e) { + concurrentFlag = -1; + throw e; + } + }); } /** @@ -187,7 +197,7 @@ public class StrictStorageConnector implements AutoCloseable { * @throws DataStoreException Same reasons as for IOException + can happen if queried storage is of unsupported type. * @throws IllegalStateException If this connector is already closed. */ - protected <V> V doUnderControl(StorageCallable<V> operator) throws IOException, DataStoreException { + protected <V> V doUnderControl(StorageCallable<? extends V> operator) throws IOException, DataStoreException { if (concurrentFlag < 0) throw new IllegalStateException("..."); if (concurrentFlag != 0) throw new ConcurrentReadException("..."); concurrentFlag++; @@ -252,6 +262,10 @@ public class StrictStorageConnector implements AutoCloseable { storage.closeAllExcept(committedStorage); } + public <T> T getOption(OptionKey<T> key) { + return storage.getOption(key); + } + /** * * @return A connector that does not provides control over resource usage. It does not automatically rewind @@ -271,7 +285,10 @@ public class StrictStorageConnector implements AutoCloseable { O apply(I storage) throws IOException, DataStoreException; } - public class StorageControlException extends RuntimeException { + /** + * indicates a failure over resource health control (rewind after use, etc.); + */ + public static class StorageControlException extends RuntimeException { public final Class<?> storageType; public StorageControlException(Class<?> storageType) { @@ -291,4 +308,147 @@ public class StrictStorageConnector implements AutoCloseable { this.storageType = storageType; } } + + interface ControlStrategy<S> { + Class<S> getType(); + + ControlOperator<S> init(S storage) throws IOException; + } + + interface ControlOperator<S> { + S getStorage(); + void postControl() throws StorageControlException, IOException; + } + + private static class ByteBufferControl implements ControlStrategy<ByteBuffer> { + + @Override + public Class<ByteBuffer> getType() { + return ByteBuffer.class; + } + + @Override + public ControlOperator<ByteBuffer> init(ByteBuffer storage) { + final ByteBuffer freezed = storage.asReadOnlyBuffer(); + return new ControlOperator<ByteBuffer>() { + @Override + public ByteBuffer getStorage() { + return freezed; + } + + @Override + public void postControl() throws StorageControlException { + storage.rewind(); + } + }; + } + } + + private static class StreamControl<T> implements ControlStrategy<T> { + + private final Class<T> viewType; + private final Function<T, ResetableStream> adapter; + + public StreamControl(Class<T> viewType, Function<T, ResetableStream> adapter) { + this.viewType = viewType; + this.adapter = adapter; + } + + @Override + public Class<T> getType() { + return viewType; + } + + @Override + public ControlOperator<T> init(T storage) throws IOException { + final ResetableStream adaptedStorage = adapter.apply(storage); + final long positionCtrl = adaptedStorage.mark(); + + return new ControlOperator<T>() { + @Override + public T getStorage() { + return storage; + } + + @Override + public void postControl() throws StorageControlException, IOException { + adaptedStorage.reset(); + + final long rewindPosition = adaptedStorage.mark(); + if (rewindPosition != positionCtrl) { + throw new StorageControlException(String.format( + "Operator has messed with stream marks. Rewind should have positioned at %d, but ended at %d", + positionCtrl, rewindPosition + ), viewType); + } + } + }; + } + } + + private interface ResetableStream { + long mark() throws IOException; + void reset() throws IOException; + } + + private static class IISAdapter implements ResetableStream { + private final ImageInputStream source; + + IISAdapter(ImageInputStream source) { + this.source = source; + } + + @Override + public long mark() throws IOException { + source.mark(); + return source.getStreamPosition(); + } + + @Override + public void reset() throws IOException { + source.reset(); + } + } + + private static class CDIAdapter implements ResetableStream { + private final ChannelDataInput source; + + CDIAdapter(ChannelDataInput source) { + this.source = source; + } + + @Override + public long mark() throws IOException { + source.mark(); + return source.getStreamPosition(); + } + + @Override + public void reset() throws IOException { + source.reset(); + } + } + + private static class SBCAdapter implements ResetableStream { + final SeekableByteChannel source; + + long mark = -1; + + SBCAdapter(SeekableByteChannel source) { + this.source = source; + } + + @Override + public long mark() throws IOException { + final long position = source.position(); + if (mark >= 0 && position != mark) throw new IllegalStateException("This stream is already marked, cannot add one."); + this.mark = position; + return mark; + } + + @Override + public void reset() throws IOException { + if (mark >= 0) source.position(mark); + } + } }
