This is an automated email from the ASF dual-hosted git repository. desruisseaux pushed a commit to branch geoapi-4.0 in repository https://gitbox.apache.org/repos/asf/sis.git
commit 67914142d82d61a67bd95779bac1767ae67de5d9 Author: Martin Desruisseaux <martin.desruisse...@geomatys.com> AuthorDate: Fri Feb 10 17:22:42 2023 +0100 Show which data are cached when a connection does not support HTTP range. This is for avoiding the impression that the application is blocked. --- .../apache/sis/internal/gui/io/FileAccessItem.java | 348 +++++++++++++++++---- .../apache/sis/internal/gui/io/FileAccessView.java | 41 ++- .../apache/sis/internal/gui/io/package-info.java | 2 +- .../apache/sis/cloud/aws/s3/CachedByteChannel.java | 20 +- .../sis/internal/storage/io/ChannelFactory.java | 29 +- .../internal/storage/io/FileCacheByteChannel.java | 117 ++++++- .../sis/internal/storage/io/HttpByteChannel.java | 14 +- .../org/apache/sis/storage/StorageConnector.java | 31 +- .../storage/io/FileCacheByteChannelTest.java | 16 +- 9 files changed, 474 insertions(+), 144 deletions(-) diff --git a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessItem.java b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessItem.java index 1b11bbe936..328c516af9 100644 --- a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessItem.java +++ b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessItem.java @@ -18,7 +18,10 @@ package org.apache.sis.internal.gui.io; import java.util.List; import java.util.ListIterator; +import java.util.EnumMap; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.SeekableByteChannel; import javafx.application.Platform; @@ -35,6 +38,7 @@ import javafx.scene.shape.StrokeType; import javafx.animation.FadeTransition; import javafx.util.Duration; import org.apache.sis.measure.Range; +import org.apache.sis.internal.util.Numerics; import org.apache.sis.util.collection.RangeSet; @@ -43,7 +47,7 @@ import org.apache.sis.util.collection.RangeSet; * This is a row in the table shown by {@link FileAccessView} table. * * @author Martin Desruisseaux (Geomatys) - * @version 1.2 + * @version 1.4 * @since 1.2 */ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> { @@ -62,16 +66,6 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> { */ private static final int MARGIN_RIGHT = 6; - /** - * Color to use for filling the rectangles. - */ - private static final Color FILL_COLOR = Color.LIGHTSEAGREEN; - - /** - * Color to use for rectangles border. - */ - private static final Color BORDER_COLOR = FILL_COLOR.darker(); - /** * Width of the cursor in pixels. */ @@ -100,9 +94,33 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> { final String filename; /** - * Range of bytes on which a read or write operation has been performed. + * The access mode. Rendering are done in enumeration order. + */ + private enum Mode { + /** Cache a range of bytes. */ CACHE(Color.LIGHTGRAY), + /** Read a range of bytes. */ READ(Color.LIGHTSEAGREEN), + /** Write a range of bytes. */ WRITE(Color.LIGHTCORAL); + + /** The color to use for rendering the rectangle. */ + private final Color border, fill; + + /** Creates a new enumeration value. */ + private Mode(final Color fill) { + this.fill = fill; + border = fill.darker(); + } + + /** Sets the colors of the given rectangle for representing this mode. */ + final void colorize(final Rectangle r) { + r.setStroke(border); + r.setFill(fill); + } + } + + /** + * Range of bytes on which read or write operations have been performed. */ - private final RangeSet<Long> accessRanges; + private final EnumMap<Mode, RangeSet<Long>> accessRanges; /** * Visual representation of {@link #accessRanges}. @@ -159,7 +177,7 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> { staticGroup = staticView.getChildren(); seeksGroup = seeksView .getChildren(); accessView = new Pane(staticView, seeksView); - accessRanges = RangeSet.create(Long.class, true, false); + accessRanges = new EnumMap<>(Mode.class); staticView.setAutoSizeChildren(false); /* * Background rectangle. @@ -167,7 +185,7 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> { final Rectangle background = new Rectangle(); background.setY(MARGIN_TOP); background.setHeight(HEIGHT); - background.setStroke(FILL_COLOR.brighter()); + background.setStroke(Mode.READ.fill.brighter()); background.setFill(Color.TRANSPARENT); background.setStrokeType(StrokeType.INSIDE); staticGroup.add(background); @@ -218,35 +236,58 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> { * Reports a read or write operation on a range of bytes. * This method must be invoked from JavaFX thread. * - * @param position offset of the first byte read or written. - * @param count number of bytes read or written. - * @param write {@code false} for a read operation, or {@code true} for a write operation. + * @param lower offset of the first byte read or written. + * @param count offset after the last byte read or written. + * @param mode whether a read, write or cache operation is performed. + * + * @see #addRangeLater(long, long, Mode) */ - private void addRange(final long position, final int count, final boolean write) { - cursorPosition = position; - final boolean add = accessRanges.add(position, position + count); + private void addRange(final long lower, final long upper, final Mode mode) { + cursorPosition = lower; + /* + * Add the range for the specified mode and remove it for all other modes. + * Consequently the visual component will show the last access mode for + * the specified range of bytes. + */ + RangeSet<Long> ranges = accessRanges.get(mode); + if (ranges == null) { + ranges = RangeSet.create(Long.class, true, false); + accessRanges.put(mode, ranges); + } + boolean add = ranges.add(lower, upper); + for (final RangeSet<Long> other : accessRanges.values()) { + if (other != null && other != ranges) { + add |= other.remove(lower, upper); + } + } + /* + * Update the visual component showing the position of last operation. + * An animation effect is used. + */ final double scale = columnWidth / fileSize; if (Double.isFinite(scale)) { if (add) { adjustSizes(scale, false); } - final Rectangle r; - if (cursor == null) { - r = new Rectangle(0, MARGIN_TOP, CURSOR_WIDTH, HEIGHT); - r.setArcWidth(CURSOR_WIDTH/2 - 1); - r.setArcHeight(HEIGHT/2 - 2); - r.setStroke(Color.ORANGE); - r.setFill(Color.YELLOW); - accessView.getChildren().add(r); - cursor = new FadeTransition(CURSOR_DURATION, r); - cursor.setOnFinished(this); - cursor.setFromValue(1); - cursor.setToValue(0); - } else { - r = (Rectangle) cursor.getNode(); + if (mode != Mode.CACHE) { + final Rectangle r; + if (cursor == null) { + r = new Rectangle(0, MARGIN_TOP, CURSOR_WIDTH, HEIGHT); + r.setArcWidth(CURSOR_WIDTH/2 - 1); + r.setArcHeight(HEIGHT/2 - 2); + r.setStroke(Color.ORANGE); + r.setFill(Color.YELLOW); + accessView.getChildren().add(r); + cursor = new FadeTransition(CURSOR_DURATION, r); + cursor.setOnFinished(this); + cursor.setFromValue(1); + cursor.setToValue(0); + } else { + r = (Rectangle) cursor.getNode(); + } + r.setX(Math.max(0, Math.min(scale*lower - CURSOR_WIDTH/2, columnWidth - CURSOR_WIDTH))); + cursor.playFromStart(); } - r.setX(Math.max(0, Math.min(scale*position - CURSOR_WIDTH/2, columnWidth - CURSOR_WIDTH))); - cursor.playFromStart(); } } @@ -287,36 +328,95 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> { /* * Adjust the position and width of all rectangles. */ - for (final Range<Long> range : accessRanges) { - final long min = range.getMinValue(); - final long max = range.getMaxValue(); - final double x = scale * min; - final double width = scale * (max - min); - if (bars.hasNext()) { - final Rectangle r = (Rectangle) bars.next(); - if (resized || r.getX() + r.getWidth() >= x) { - r.setX(x); - r.setWidth(width); - continue; + for (final EnumMap.Entry<Mode, RangeSet<Long>> entry : accessRanges.entrySet()) { + final Mode mode = entry.getKey(); + for (final Range<Long> range : entry.getValue()) { + final long min = range.getMinValue(); + final long max = range.getMaxValue(); + final double x = scale * min; + final double width = scale * (max - min); + if (bars.hasNext()) { + final Rectangle r = (Rectangle) bars.next(); + if (resized || r.getX() + r.getWidth() >= x) { + r.setX(x); + r.setWidth(width); + mode.colorize(r); + continue; + } + /* + * Newly added range may have merged two or more ranges in a single one. + * Discard all ranges that are fully on the left side of current range. + * This is not really mandatory, but we do that in an effort to keep the + * most "relevant" rectangles (before change) for the new set of ranges. + */ + bars.remove(); } - /* - * Newly added range may have merged two or more ranges in a single one. - * Discard all ranges that are fully on the left side of current range. - * This is not really mandatory, but we do that in an effort to keep the - * most "relevant" rectangles (before change) for the new set of ranges. - */ - bars.remove(); + final Rectangle r = new Rectangle(x, MARGIN_TOP, width, HEIGHT); + r.setStrokeType(StrokeType.INSIDE); + mode.colorize(r); + bars.add(r); } - final Rectangle r = new Rectangle(x, MARGIN_TOP, width, HEIGHT); - r.setStrokeType(StrokeType.INSIDE); - r.setStroke(BORDER_COLOR); - r.setFill(FILL_COLOR); - bars.add(r); } // Remove all remaining children, if any. staticGroup.remove(bars.nextIndex(), staticGroup.size()); } + /** + * A range of bytes determined from the background thread and to be consumed in the JavaFX thread. + * This range can be updated as long as it has not been consumed. Those modifications reduce the + * amount of events to be consumed by the JavaFX thread. + */ + private final class NextAddRange implements Runnable { + /** Whether the range of bytes has been read, written or cached. */ + private final Mode mode; + + /** The range of bytes, modifiable as long as the event has not been consumed. */ + long lower, upper; + + /** Creates a new range of bytes for the given access mode. */ + NextAddRange(final Mode mode) { + this.mode = mode; + } + + /** Invoked in the JavaFX thread for saving the range in {@link #accessRanges}. */ + @Override public void run() { + synchronized (FileAccessItem.this) { + if (next == this) { + next = null; + } + } + addRange(lower, upper, mode); + } + } + + /** + * The next range of bytes to be merged into {@link #accessRanges}. + * Accesses to this field must be synchronized on {@code this}. + * The instance is created in a background thread and consumed in the JavaFX thread. + */ + private NextAddRange next; + + /** + * Reports a read or write operation on a range of bytes. + * This method should be invoked from a background thread. + * + * @param lower offset of the first byte read or written. + * @param count number of bytes read or written. + * @param mode whether a read, write or cache operation is performed. + */ + private synchronized void addRangeLater(long lower, final long count, final Mode mode) { + long upper = Numerics.saturatingAdd(lower, count); + if (next != null && next.mode == mode && next.upper >= lower && next.lower <= upper) { + lower = Math.min(next.lower, lower); + upper = Math.max(next.upper, upper); + } else { + next = new NextAddRange(mode); + Platform.runLater(next); + } + next.lower = lower; + next.upper = upper; + } + /** * Wrapper around a {@link SeekableByteChannel} which will observe the ranges of bytes read or written. */ @@ -341,7 +441,7 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> { public int read(final ByteBuffer dst) throws IOException { final long position = position(); final int count = channel.read(dst); - Platform.runLater(() -> addRange(position, count, false)); + addRangeLater(position, count, Mode.READ); return count; } @@ -352,7 +452,7 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> { public int write(final ByteBuffer src) throws IOException { final long position = position(); final int count = channel.write(src); - Platform.runLater(() -> addRange(position, count, true)); + addRangeLater(position, count, Mode.WRITE); return count; } @@ -409,6 +509,130 @@ final class FileAccessItem implements Runnable, EventHandler<ActionEvent> { } } + /** + * Wrapper around an {@link InputStream} which will observe the ranges of bytes read. + * It can be used directly when the input is an {@link InputStream}, or indirectly + * when the channel observed by {@link Observer} is itself wrapping an input stream. + * In such case, the bytes read from the input stream are typically cached in some temporary file. + * + * <h2>Implementation note</h2> + * We do not extend {@link java.io.FilterInputStream} because we override almost all methods anyway. + * This implementation avoids using a non-final {@code volatile} field for the wrapped input stream. + */ + final class InputObserver extends InputStream { + /** The source input stream. */ + private final InputStream in; + + /** The mode, either read or cache. */ + private final Mode mode; + + /** Position of the stream, current and marked. */ + private long position, mark; + + /** Creates a new observer for the given input stream. */ + InputObserver(final InputStream in) { + this.in = in; + this.mode = Mode.READ; + } + + /** Creates a new observer for the given input stream used as a cache. */ + InputObserver(final InputStream in, final long start) { + this.in = in; + this.mode = Mode.CACHE; + position = start; + } + + /** + * Declares that a range of bytes has been read. + * This method update the rectangles in the JavaFX view. + * + * @param count number of bytes that have been read. + * @param mode the mode, usually {@link #mode}. + */ + private void range(final long count, final Mode mode) { + if (count > 0) { + addRangeLater(position, count, mode); + if (position > (position += count)) { + position = Long.MAX_VALUE; + } + } + } + + /** + * Declares that a range of bytes has been read. + * This method update the rectangles in the JavaFX view. + * + * @param count number of bytes that have been read. + */ + private void range(final long count) { + range(count, mode); + } + + /** Returns the next byte or -1 on EOF. */ + @Override public int read() throws IOException { + final int b = in.read(); + if (b >= 0) range(1); + return b; + } + + /** Stores a sequence of bytes in the specified array. */ + @Override public int read(final byte[] b, final int off, int len) throws IOException { + range(len = in.read(b, off, len)); + return len; + } + + /** Stores a sequence of bytes in a newly allocated array. */ + @Override public byte[] readNBytes(final int len) throws IOException { + final byte[] b = in.readNBytes(len); + range(b.length); + return b; + } + + /** Stores a sequence of bytes in the specified output stream. */ + @Override public long transferTo(final OutputStream out) throws IOException { + final long n = in.transferTo(out); + range(n); + return n; + } + + /** Skips some bytes without reporting them as read. */ + @Override public long skip(long n) throws IOException { + range(n = in.skip(n), Mode.CACHE); + return n; + } + + /** Returns an estimate of the number of bytes that can be read. */ + @Override public int available() throws IOException { + return in.available(); + } + + /** Tells whether the input stream supports marks. */ + @Override public boolean markSupported() { + return in.markSupported(); + } + + /** Marks the current position in this input stream. */ + @Override public void mark(final int readlimit) { + in.mark(readlimit); + mark = position; + } + + /** Repositions this stream to the position of the last mark. */ + @Override public void reset() throws IOException { + in.reset(); + position = mark; + } + + /** Closes the wrapped input stream. */ + @Override public void close() throws IOException { + if (mode == Mode.READ) { + Platform.runLater(FileAccessItem.this); + // Otherwise will be removed by `Observer`. + } + in.close(); + } + } + /** * Invoked in JavaFX thread for removing this row from the table. */ diff --git a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessView.java b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessView.java index 4404007173..0de8fef6d6 100644 --- a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessView.java +++ b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/FileAccessView.java @@ -17,6 +17,7 @@ package org.apache.sis.internal.gui.io; import java.io.IOException; +import java.io.InputStream; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; @@ -32,6 +33,7 @@ import org.apache.sis.internal.gui.FixedHeaderColumnSize; import org.apache.sis.internal.gui.ImmutableObjectProperty; import org.apache.sis.internal.gui.Resources; import org.apache.sis.internal.storage.io.ChannelFactory; +import org.apache.sis.internal.storage.io.FileCacheByteChannel; import org.apache.sis.storage.DataStoreException; import org.apache.sis.storage.event.StoreListeners; @@ -45,7 +47,7 @@ import org.apache.sis.storage.event.StoreListeners; * in the vast majority of cases when user has no interest in those information.</p> * * @author Martin Desruisseaux (Geomatys) - * @version 1.2 + * @version 1.4 * @since 1.2 */ public final class FileAccessView extends Widget implements UnaryOperator<ChannelFactory> { @@ -113,6 +115,36 @@ public final class FileAccessView extends Widget implements UnaryOperator<Channe return factory.canOpen(); } + /** + * Returns a new visual item and adds it as a row in the table of opened files. + * This method can be invoked from any thread (usually not the JavaFX one). + * + * @param filename data store name. + * @return the view of the row added in the table. + */ + private FileAccessItem newItem(final String filename) { + final FileAccessItem item = new FileAccessItem(table.getItems(), filename); + Platform.runLater(() -> item.owner.add(item)); + return item; + } + + /** + * Returns the readable channel as an input stream. + * + * @param filename data store name. + * @param listeners set of registered {@code StoreListener}s for the data store, or {@code null} if none. + * @return the input stream. + * @throws DataStoreException if the channel is read-once. + * @throws IOException if the input stream or its underlying byte channel cannot be created. + */ + @Override + public InputStream inputStream(final String filename, final StoreListeners listeners) + throws DataStoreException, IOException + { + final InputStream input = factory.inputStream(filename, listeners); + return newItem(filename).new InputObserver(input); + } + /** * Creates a readable channel and listens (if possible) read operations. * Current implementation listens only to {@link SeekableByteChannel} @@ -130,8 +162,11 @@ public final class FileAccessView extends Widget implements UnaryOperator<Channe { final ReadableByteChannel channel = factory.readable(filename, listeners); if (channel instanceof SeekableByteChannel) { - final FileAccessItem item = new FileAccessItem(table.getItems(), filename); - Platform.runLater(() -> item.owner.add(item)); + final FileAccessItem item = newItem(filename); + if (channel instanceof FileCacheByteChannel) { + ((FileCacheByteChannel) channel).setFilter((input, start, end) -> + item.new InputObserver(input, start)); + } return item.new Observer((SeekableByteChannel) channel); } return channel; diff --git a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/package-info.java b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/package-info.java index a929e6e171..2f26f52e0a 100644 --- a/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/package-info.java +++ b/application/sis-javafx/src/main/java/org/apache/sis/internal/gui/io/package-info.java @@ -24,7 +24,7 @@ * may change in incompatible ways in any future version without notice. * * @author Martin Desruisseaux (Geomatys) - * @version 1.2 + * @version 1.4 * * @see org.apache.sis.internal.gui.DataStoreOpener * diff --git a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java index 5d4b80845a..66ada480a6 100644 --- a/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java +++ b/cloud/sis-cloud-aws/src/main/java/org/apache/sis/cloud/aws/s3/CachedByteChannel.java @@ -18,7 +18,6 @@ package org.apache.sis.cloud.aws.s3; import java.util.List; import java.io.IOException; -import java.io.InputStream; import org.apache.sis.internal.storage.io.FileCacheByteChannel; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; @@ -86,9 +85,9 @@ final class CachedByteChannel extends FileCacheByteChannel { if (contentRange == null) { final Long contentLength = response.contentLength(); final long length = (contentLength != null) ? contentLength : -1; - return new Connection(stream, length, rangeUnits); + return new Connection(this, stream, length, rangeUnits); } else { - return new Connection(stream, contentRange, rangeUnits); + return new Connection(this, stream, contentRange, rangeUnits); } } catch (IllegalArgumentException e) { throw new IOException(e); @@ -99,18 +98,19 @@ final class CachedByteChannel extends FileCacheByteChannel { } /** - * Invoked when this channel is no longer interested in reading bytes from the specified stream. + * Invoked when this channel is no longer interested in reading bytes from the specified connection. * - * @param input the input stream to eventually close. - * @return whether the given input stream has been closed by this method. + * @param connection contains the input stream to eventually close. + * @return whether the input stream has been closed by this method. + * @throws IOException if an error occurred while closing the stream or preparing for next read operations. */ @Override - protected boolean abort(final InputStream input) throws IOException { - if (input instanceof Abortable) { - ((Abortable) input).abort(); + protected boolean abort(final Connection connection) throws IOException { + if (connection.rawInput instanceof Abortable) { + ((Abortable) connection.rawInput).abort(); return true; } else { - return super.abort(input); + return super.abort(connection); } } } diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelFactory.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelFactory.java index 0da1f94cab..87a533cd72 100644 --- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelFactory.java +++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/ChannelFactory.java @@ -22,7 +22,6 @@ import java.util.HashSet; import java.util.Arrays; import java.util.logging.Level; import java.util.logging.LogRecord; -import java.util.function.UnaryOperator; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -132,36 +131,10 @@ public abstract class ChannelFactory { * If the URL is not encoded, then {@code null}. This argument is ignored if the given * input does not need to be converted from URL to {@code File}. * @param options the options to use for creating a new byte channel. Can be null or empty for read-only. - * @param wrapper a function for creating wrapper around the factory, or {@code null} if none. - * It can be used for installing listener or for transforming data on the fly. * @return the channel factory for the given input, or {@code null} if the given input is of unknown type. * @throws IOException if an error occurred while processing the given input. */ - public static ChannelFactory prepare( - final Object storage, final boolean allowWriteOnly, - final String encoding, final OpenOption[] options, - final UnaryOperator<ChannelFactory> wrapper) throws IOException - { - ChannelFactory factory = prepare(storage, allowWriteOnly, encoding, options); - if (factory != null && wrapper != null) { - factory = wrapper.apply(factory); - } - return factory; - } - - /** - * Returns a byte channel factory without wrappers, or {@code null} if unsupported. - * This method performs the same work than {@linkplain #prepare(Object, boolean, String, - * OpenOption[], UnaryOperator, UnaryOperator) above method}, but without wrappers. - * - * @param storage the stream or the file to open, or {@code null}. - * @param allowWriteOnly whether to allow wrapping {@link WritableByteChannel} and {@link OutputStream}. - * @param encoding if the input is an encoded URL, the character encoding (normally {@code "UTF-8"}). - * @param options the options to use for creating a new byte channel. Can be null or empty for read-only. - * @return the channel factory for the given input, or {@code null} if the given input is of unknown type. - * @throws IOException if an error occurred while processing the given input. - */ - private static ChannelFactory prepare(Object storage, final boolean allowWriteOnly, + public static ChannelFactory prepare(Object storage, final boolean allowWriteOnly, final String encoding, final OpenOption[] options) throws IOException { /* diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java index 15fe5a29e6..5edbffec53 100644 --- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java +++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/FileCacheByteChannel.java @@ -17,6 +17,7 @@ package org.apache.sis.internal.storage.io; import java.util.Collection; +import java.util.function.UnaryOperator; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -86,8 +87,17 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { /** The unit of ranges used in HTTP connections. */ private static final String RANGES_UNIT = "bytes"; - /** The input stream for reading the bytes. */ - final InputStream input; + /** + * The input stream without filtering, as specified at construction time. + * This is the same instance than {@link #input} when no filtering is applied. + */ + public final InputStream rawInput; + + /** + * The input stream for reading the bytes. It may be a wrapper around the input stream + * specified at construction time if {@linkplain #setFilter a filter has been set}. + */ + public final InputStream input; /** Position of the first byte read by the input stream (inclusive). */ final long start; @@ -104,6 +114,7 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { /** * Creates information about a connection. * + * @param owner the channel which is opening this connection, or {@code null} if none. * @param input the input stream for reading the bytes. * @param start position of the first byte read by the input stream (inclusive). * @param end position of the last byte read by the input stream (inclusive). @@ -112,12 +123,15 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { * * @see #openConnection(long, long) */ - public Connection(final InputStream input, final long start, final long end, final long length, final boolean acceptRanges) { - this.input = input; + public Connection(final FileCacheByteChannel owner, final InputStream input, + final long start, final long end, final long length, final boolean acceptRanges) + { + rawInput = input; this.start = start; this.end = end; this.length = length; this.acceptRanges = acceptRanges; + this.input = filter(owner, input); // Must be last. } /** @@ -125,17 +139,21 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { * The "Content-Length" header value is useful to this class only if the connection was * opened for the full file. * + * @param owner the channel which is opening this connection, or {@code null} if none. * @param input the input stream for reading the bytes. * @param contentLength length of the response content, or -1 if unknown. * @param rangeUnits value of "Accept-Ranges" in HTTP header, which lists the accepted units. * @throws IllegalArgumentException if the start, end or length cannot be parsed. */ - public Connection(final InputStream input, final long contentLength, final Iterable<String> rangeUnits) { - this.input = input; + public Connection(final FileCacheByteChannel owner, final InputStream input, + final long contentLength, final Iterable<String> rangeUnits) + { + rawInput = input; this.start = 0; this.end = (contentLength > 0) ? contentLength - 1 : Long.MAX_VALUE; this.length = contentLength; acceptRanges = acceptRanges(rangeUnits); + this.input = filter(owner, input); } /** @@ -145,13 +163,16 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { * * <p>Example of content range value: {@code "Content-Range: bytes 25000-75000/100000"}.</p> * + * @param owner the channel which is opening this connection, or {@code null} if none. * @param input the input stream for reading the bytes. * @param contentRange value of "Content-Range" in HTTP header. * @param rangeUnits value of "Accept-Ranges" in HTTP header, which lists the accepted units. * @throws IllegalArgumentException if the start, end or length cannot be parsed. */ - public Connection(final InputStream input, String contentRange, final Collection<String> rangeUnits) { - this.input = input; + public Connection(final FileCacheByteChannel owner, final InputStream input, + String contentRange, final Collection<String> rangeUnits) + { + rawInput = input; long contentLength = -1; contentRange = contentRange.trim(); int s = contentRange.indexOf(' '); @@ -175,6 +196,25 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { * supported units did not changed. */ acceptRanges = rangeUnits.isEmpty() || acceptRanges(rangeUnits); + this.input = filter(owner, input); + } + + /** + * If an optional filtering has been specified, applied it on the given input stream. + * This method should be invoked last in constructor, because it needs other fields. + * + * @param owner the channel which is opening a connection, or {@code null} if none. + * @param input the input stream created for a new connection. + * @return the filtered input stream, or {@code input} if there is no filtering. + * + * @see #setFilter(UnaryOperator) + */ + private InputStream filter(final FileCacheByteChannel owner, InputStream input) { + final Filter filter; + if (owner != null && (filter = owner.filter) != null) { + input = filter.apply(input, start, end); + } + return input; } /** @@ -235,6 +275,36 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { */ private Connection connection; + /** + * An optional filter to apply on the input stream opened for a connections. + * A filter may be installed for example for being notified of the ranges of + * bytes that are read, or for transforming the data. + * + * @see #setFilter(Filter) + * @see java.io.FilterInputStream + * @see java.io.FilterOutputStream + */ + public interface Filter { + /** + * Invoked when an input stream is created for a new connection. + * + * @param input the input stream for the new connection. + * @param start position of the first byte to be returned by the input stream. + * @param end position (inclusive) of the last byte to be returned. + * @return the input stream to use for reading data. + * + * @see java.io.FilterInputStream + */ + InputStream apply(InputStream input, long start, long end); + } + + /** + * Optional filters to apply on the streams opened for a connection. + * + * @see #setFilter(Filter) + */ + private Filter filter; + /** * Input/output channel on the temporary or cached file where data are copied. * The {@linkplain FileChannel#position() position of this file channel} shall @@ -305,6 +375,19 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { StandardOpenOption.DELETE_ON_CLOSE); } + /** + * Applies an optional filter on the streams opened for each new connection. + * A filter may be installed for example for being notified of the ranges of + * bytes that are read, or for transforming the data. + * + * @param filter a function which receives in argument the stream created + * for a new connection, and returns the stream to use. + * A {@code null} function remove filtering. + */ + public final void setFilter(final Filter filter) { + this.filter = filter; + } + /** * Returns the filename to use in error messages. * @@ -328,24 +411,24 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { /** * Invoked when this channel is no longer interested in reading bytes from the specified stream. * This method is invoked for example when this channel needs to skip an arbitrarily large number - * of bytes because the {@linkplain #position(long) position changed}. The {@code input} argument - * is the value in the record returned by a previous call to {@link #openConnection(long, long)}. + * of bytes because the {@linkplain #position(long) position changed}. The {@code connection} + * argument is the value returned by a previous call to {@link #openConnection(long, long)}. * The boolean return value tells what this method has done: * * <ul class="verbose"> - * <li>If this method returns {@code true}, then the given stream has been closed by this method and this + * <li>If this method returns {@code true}, then the input stream has been closed by this method and this * channel is ready to create a new stream on the next call to {@link #openConnection(long, long)}.</li> - * <li>If this method returns {@code false}, then the given stream is still alive and should continue to be used. + * <li>If this method returns {@code false}, then the input stream is still alive and should continue to be used. * The {@link #openConnection(long, long)} method will <em>not</em> be invoked. * Instead, bytes will be skipped by reading them from the current input stream and caching them.</li> * </ul> * - * @param input the input stream to eventually close. - * @return whether the given input stream has been closed by this method. If {@code false}, + * @param connection container of the input stream to eventually close. + * @return whether the input stream has been closed by this method. If {@code false}, * then this channel should continue to use that input stream instead of opening a new connection. * @throws IOException if an error occurred while closing the stream or preparing for next read operations. */ - protected boolean abort(InputStream input) throws IOException { + protected boolean abort(Connection connection) throws IOException { return false; } @@ -673,7 +756,7 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { cache(buffer.limit(n)); count += n; } - if (abort(input)) { + if (abort(connection)) { connection = null; } return count; @@ -729,7 +812,7 @@ public abstract class FileCacheByteChannel implements SeekableByteChannel { transfer = null; idleHandler = null; try (file) { - if (c != null && !abort(c.input)) { + if (c != null && !abort(c)) { c.input.close(); } } diff --git a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java index b9e91523a0..47d2ef6664 100644 --- a/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java +++ b/storage/sis-storage/src/main/java/org/apache/sis/internal/storage/io/HttpByteChannel.java @@ -122,9 +122,9 @@ final class HttpByteChannel extends FileCacheByteChannel { try { if (range == null) { final long length = headers.firstValueAsLong("Content-Length").orElse(-1); - return new Connection(stream, length, rangeUnits); + return new Connection(this, stream, length, rangeUnits); } else { - return new Connection(stream, range, rangeUnits); + return new Connection(this, stream, range, rangeUnits); } } catch (IllegalArgumentException e) { throw new IOException(e); @@ -132,15 +132,15 @@ final class HttpByteChannel extends FileCacheByteChannel { } /** - * Invoked when this channel is no longer interested in reading bytes from the specified stream. + * Invoked when this channel is no longer interested in reading bytes from the specified connection. * - * @param input the input stream to eventually close. - * @return whether the given input stream has been closed by this method. + * @param connection contains the input stream to eventually close. + * @return whether the input stream has been closed by this method. * @throws IOException if an error occurred while closing the stream or preparing for next read operations. */ @Override - protected boolean abort(final InputStream input) throws IOException { - input.close(); + protected boolean abort(final Connection connection) throws IOException { + connection.input.close(); return true; } } diff --git a/storage/sis-storage/src/main/java/org/apache/sis/storage/StorageConnector.java b/storage/sis-storage/src/main/java/org/apache/sis/storage/StorageConnector.java index 808db94167..269ba41729 100644 --- a/storage/sis-storage/src/main/java/org/apache/sis/storage/StorageConnector.java +++ b/storage/sis-storage/src/main/java/org/apache/sis/storage/StorageConnector.java @@ -19,6 +19,7 @@ package org.apache.sis.storage; import java.util.Map; import java.util.Iterator; import java.util.IdentityHashMap; +import java.util.function.UnaryOperator; import java.io.Reader; import java.io.DataInput; import java.io.DataOutput; @@ -35,6 +36,7 @@ import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.channels.SeekableByteChannel; import java.nio.file.NoSuchFileException; +import java.nio.file.OpenOption; import javax.imageio.stream.ImageInputStream; import javax.imageio.stream.ImageOutputStream; import javax.imageio.IIOException; @@ -985,10 +987,7 @@ public class StorageConnector implements Serializable { * URL, URI, File, Path or other types that may be added in future Apache SIS versions. * If the given storage is already a ReadableByteChannel, then the factory will return it as-is. */ - final ChannelFactory factory = ChannelFactory.prepare(storage, false, - getOption(OptionKey.URL_ENCODING), - getOption(OptionKey.OPEN_OPTIONS), - getOption(InternalOptionKey.CHANNEL_FACTORY_WRAPPER)); + final ChannelFactory factory = createChannelFactory(false); if (factory == null) { return null; } @@ -1309,6 +1308,25 @@ public class StorageConnector implements Serializable { addView(type, view, null, (byte) 0); } + /** + * Returns a byte channel factory from the storage, or {@code null} if the storage is unsupported. + * See {@link ChannelFactory#prepare(Object, boolean, String, OpenOption[])} for more information. + * + * @param allowWriteOnly whether to allow wrapping {@link WritableByteChannel} and {@link OutputStream}. + * @return the channel factory for the given input, or {@code null} if the given input is of unknown type. + * @throws IOException if an error occurred while processing the given input. + */ + private ChannelFactory createChannelFactory(final boolean allowWriteOnly) throws IOException { + ChannelFactory factory = ChannelFactory.prepare(storage, allowWriteOnly, + getOption(OptionKey.URL_ENCODING), + getOption(OptionKey.OPEN_OPTIONS)); + final UnaryOperator<ChannelFactory> wrapper = getOption(InternalOptionKey.CHANNEL_FACTORY_WRAPPER); + if (factory != null && wrapper != null) { + factory = wrapper.apply(factory); + } + return factory; + } + /** * Creates a view for the storage as a {@link ChannelDataOutput} if possible. * This code is a partial copy of {@link #createDataInput()} adapted for output. @@ -1328,10 +1346,7 @@ public class StorageConnector implements Serializable { * URL, URI, File, Path or other types that may be added in future Apache SIS versions. * If the given storage is already a WritableByteChannel, then the factory will return it as-is. */ - final ChannelFactory factory = ChannelFactory.prepare(storage, true, - getOption(OptionKey.URL_ENCODING), - getOption(OptionKey.OPEN_OPTIONS), - getOption(InternalOptionKey.CHANNEL_FACTORY_WRAPPER)); + final ChannelFactory factory = createChannelFactory(true); if (factory == null) { return null; } diff --git a/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java index 5da0e3dce7..441cae8bea 100644 --- a/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java +++ b/storage/sis-storage/src/test/java/org/apache/sis/internal/storage/io/FileCacheByteChannelTest.java @@ -99,15 +99,15 @@ public final class FileCacheByteChannelTest extends TestCase { end = Math.min(end + random.nextInt(40), length); } while (start >= end); var input = new ComputedInputStream(Math.toIntExact(start), Math.toIntExact(end), random); - return new Connection(input, start, end-1, length, true); + return new Connection(null, input, start, end-1, length, true); } /** - * Marks the given input stream as closed and notify that a new one can be created. + * Marks the input stream as closed and notify that a new one can be created. */ @Override - protected boolean abort(final InputStream input) throws IOException { - input.close(); + protected boolean abort(final Connection connection) throws IOException { + connection.input.close(); return true; } @@ -211,23 +211,23 @@ public final class FileCacheByteChannelTest extends TestCase { public void testParseRange() { final List<String> rangesUnit = List.of("bytes"); FileCacheByteChannel.Connection c; - c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000/100000", rangesUnit); + c = new FileCacheByteChannel.Connection(null, null, "bytes 25000-75000/100000", rangesUnit); assertEquals( 25000, c.start); assertEquals( 75000, c.end); assertEquals(100000, c.length); - c = new FileCacheByteChannel.Connection(null, "bytes 25000-75000", rangesUnit); + c = new FileCacheByteChannel.Connection(null, null, "bytes 25000-75000", rangesUnit); assertEquals( 25000, c.start); assertEquals( 75000, c.end); assertEquals( -1, c.length); - c = new FileCacheByteChannel.Connection(null, "bytes 25000/100000", rangesUnit); + c = new FileCacheByteChannel.Connection(null, null, "bytes 25000/100000", rangesUnit); assertEquals( 25000, c.start); assertEquals(100000, c.end); assertEquals(100000, c.length); // Not legal, but we test robustness. - c = new FileCacheByteChannel.Connection(null, "25000", rangesUnit); + c = new FileCacheByteChannel.Connection(null, null, "25000", rangesUnit); assertEquals( 25000, c.start); assertEquals( -1, c.end); assertEquals( -1, c.length);