This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d6dff7f76e [FLINK-39533][s3] Use abort() instead of drain on 
close/seek when remaining bytes exceed threshold in NativeS3InputStream
3d6dff7f76e is described below

commit 3d6dff7f76ee4da1bc9e095a311d9055780222da
Author: Samrat <[email protected]>
AuthorDate: Mon Apr 27 21:25:52 2026 +0530

    [FLINK-39533][s3] Use abort() instead of drain on close/seek when remaining 
bytes exceed threshold in NativeS3InputStream
    
    * [FLINK-39533][s3] Use abort() instead of drain on close/seek when 
remaining bytes exceed threshold in NativeS3InputStream
    
    * [FLINK-39533][s3] Address to review comments. Bug at the end of stream 
and addres to s3 returning 416
---
 .../flink/fs/s3native/NativeS3InputStream.java     | 166 ++++++----
 .../flink/fs/s3native/NativeS3InputStreamTest.java | 366 +++++++++++++++++++++
 2 files changed, 465 insertions(+), 67 deletions(-)

diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java
index f6dca047f64..32368dd94f6 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java
@@ -27,7 +27,10 @@ import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
 import software.amazon.awssdk.services.s3.model.GetObjectResponse;
 
+import javax.annotation.concurrent.GuardedBy;
+
 import java.io.BufferedInputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -56,9 +59,16 @@ class NativeS3InputStream extends FSDataInputStream {
     private final long contentLength;
     private final int readBufferSize;
 
+    @GuardedBy("lock")
     private ResponseInputStream<GetObjectResponse> currentStream;
+
+    @GuardedBy("lock")
     private BufferedInputStream bufferedStream;
+
+    @GuardedBy("lock")
     private long position;
+
+    @GuardedBy("lock")
     private volatile boolean closed;
 
     public NativeS3InputStream(
@@ -88,12 +98,28 @@ class NativeS3InputStream extends FSDataInputStream {
                 this.readBufferSize / 1024);
     }
 
+    @GuardedBy("lock")
     private void lazyInitialize() throws IOException {
+        assert lock.isHeldByCurrentThread() : "lazyInitialize() requires lock 
to be held";
         if (currentStream == null && !closed) {
             openStreamAtCurrentPosition();
         }
     }
 
+    /** At EOF, release instead of reopening: {@code bytes=contentLength-} 
returns S3 416. */
+    @GuardedBy("lock")
+    private void repositionOpenStream() throws IOException {
+        assert lock.isHeldByCurrentThread() : "repositionOpenStream() requires 
lock to be held";
+        if (currentStream == null) {
+            return;
+        }
+        if (position >= contentLength) {
+            releaseStreams();
+        } else {
+            openStreamAtCurrentPosition();
+        }
+    }
+
     /**
      * Opens (or reopens) the S3 stream at the current position.
      *
@@ -108,24 +134,7 @@ class NativeS3InputStream extends FSDataInputStream {
     private void openStreamAtCurrentPosition() throws IOException {
         lock.lock();
         try {
-            if (bufferedStream != null) {
-                try {
-                    bufferedStream.close();
-                } catch (IOException e) {
-                    LOG.warn("Error closing buffered stream for {}/{}", 
bucketName, key, e);
-                } finally {
-                    bufferedStream = null;
-                }
-            }
-            if (currentStream != null) {
-                try {
-                    currentStream.close();
-                } catch (IOException e) {
-                    LOG.warn("Error closing S3 response stream for {}/{}", 
bucketName, key, e);
-                } finally {
-                    currentStream = null;
-                }
-            }
+            releaseStreams();
 
             try {
                 GetObjectRequest.Builder requestBuilder =
@@ -143,20 +152,7 @@ class NativeS3InputStream extends FSDataInputStream {
                 currentStream = s3Client.getObject(requestBuilder.build());
                 bufferedStream = new BufferedInputStream(currentStream, 
readBufferSize);
             } catch (Exception e) {
-                if (bufferedStream != null) {
-                    try {
-                        bufferedStream.close();
-                    } catch (IOException ignored) {
-                    }
-                    bufferedStream = null;
-                }
-                if (currentStream != null) {
-                    try {
-                        currentStream.close();
-                    } catch (IOException ignored) {
-                    }
-                    currentStream = null;
-                }
+                releaseStreams();
                 throw new IOException("Failed to open S3 stream for " + 
bucketName + "/" + key, e);
             }
         } finally {
@@ -164,6 +160,64 @@ class NativeS3InputStream extends FSDataInputStream {
         }
     }
 
+    /**
+     * Aborts the in-flight HTTP connection so that subsequent {@code close()} 
calls on the stream
+     * do not drain remaining bytes over the network.
+     *
+     * @see ResponseInputStream#abort()
+     */
+    @GuardedBy("lock")
+    private void abortCurrentStream() {
+        assert lock.isHeldByCurrentThread() : "abortCurrentStream() requires 
lock to be held";
+        if (currentStream != null) {
+            try {
+                currentStream.abort();
+            } catch (RuntimeException e) {
+                LOG.warn("Error aborting S3 response stream for {}/{}", 
bucketName, key, e);
+            }
+        }
+    }
+
+    /**
+     * Aborts and closes both streams, nulling the references. The abort is 
called first to prevent
+     * {@link ResponseInputStream#close()} from draining remaining bytes over 
the network.
+     *
+     * @return the first {@link IOException} encountered (with subsequent ones 
added as suppressed),
+     *     or {@code null} if cleanup succeeded without errors
+     */
+    @GuardedBy("lock")
+    private IOException releaseStreams() {
+        assert lock.isHeldByCurrentThread() : "releaseStreams() requires lock 
to be held";
+        abortCurrentStream();
+        IOException exception = null;
+
+        if (bufferedStream != null) {
+            try {
+                bufferedStream.close();
+            } catch (IOException e) {
+                exception = e;
+                LOG.warn("Error closing buffered stream for {}/{}", 
bucketName, key, e);
+            } finally {
+                bufferedStream = null;
+            }
+        }
+        if (currentStream != null) {
+            try {
+                currentStream.close();
+            } catch (IOException e) {
+                if (exception == null) {
+                    exception = e;
+                } else {
+                    exception.addSuppressed(e);
+                }
+                LOG.warn("Error closing S3 response stream for {}/{}", 
bucketName, key, e);
+            } finally {
+                currentStream = null;
+            }
+        }
+        return exception;
+    }
+
     @Override
     public void seek(long desired) throws IOException {
         lock();
@@ -172,14 +226,19 @@ class NativeS3InputStream extends FSDataInputStream {
                 throw new IOException("Stream is closed");
             }
             if (desired < 0) {
-                throw new IOException("Cannot seek to negative position: " + 
desired);
+                throw new EOFException("Cannot seek to negative position: " + 
desired);
+            }
+            if (desired > contentLength) {
+                throw new EOFException(
+                        "Cannot seek past end of stream: position="
+                                + desired
+                                + ", length="
+                                + contentLength);
             }
 
             if (desired != position) {
                 position = desired;
-                if (currentStream != null) {
-                    openStreamAtCurrentPosition();
-                }
+                repositionOpenStream();
             }
         } finally {
             lock.unlock();
@@ -203,10 +262,10 @@ class NativeS3InputStream extends FSDataInputStream {
             if (closed) {
                 throw new IOException("Stream is closed");
             }
-            lazyInitialize();
             if (position >= contentLength) {
                 return -1;
             }
+            lazyInitialize();
             int data = bufferedStream.read();
             if (data != -1) {
                 position++;
@@ -236,10 +295,10 @@ class NativeS3InputStream extends FSDataInputStream {
             if (closed) {
                 throw new IOException("Stream is closed");
             }
-            lazyInitialize();
             if (position >= contentLength) {
                 return -1;
             }
+            lazyInitialize();
             long remaining = contentLength - position;
             int toRead = (int) Math.min(len, remaining);
             int bytesRead = bufferedStream.read(b, off, toRead);
@@ -270,33 +329,8 @@ class NativeS3InputStream extends FSDataInputStream {
             }
 
             closed = true;
-            IOException exception = null;
-
-            if (bufferedStream != null) {
-                try {
-                    bufferedStream.close();
-                } catch (IOException e) {
-                    exception = e;
-                    LOG.warn("Error closing buffered stream for {}/{}", 
bucketName, key, e);
-                } finally {
-                    bufferedStream = null;
-                }
-            }
 
-            if (currentStream != null) {
-                try {
-                    currentStream.close();
-                } catch (IOException e) {
-                    if (exception == null) {
-                        exception = e;
-                    } else {
-                        exception.addSuppressed(e);
-                    }
-                    LOG.warn("Error closing S3 response stream for {}/{}", 
bucketName, key, e);
-                } finally {
-                    currentStream = null;
-                }
-            }
+            IOException exception = releaseStreams();
 
             LOG.debug(
                     "Closed S3 input stream - bucket: {}, key: {}, final 
position: {}/{}",
@@ -350,9 +384,7 @@ class NativeS3InputStream extends FSDataInputStream {
             long skipped = newPos - position;
             if (newPos != position) {
                 position = newPos;
-                if (currentStream != null) {
-                    openStreamAtCurrentPosition();
-                }
+                repositionOpenStream();
             }
             return skipped;
         } finally {
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java
 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java
new file mode 100644
index 00000000000..90591fcd94a
--- /dev/null
+++ 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3InputStreamTest.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.http.Abortable;
+import software.amazon.awssdk.http.AbortableInputStream;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link NativeS3InputStream}. */
+class NativeS3InputStreamTest {
+
+    private static final String BUCKET = "test-bucket";
+    private static final String KEY = "test-key";
+
+    private static final byte[] DATA;
+
+    static {
+        DATA = new byte[256];
+        for (int i = 0; i < DATA.length; i++) {
+            DATA[i] = (byte) i;
+        }
+    }
+
+    private static class TrackingInputStream extends InputStream implements 
Abortable {
+        private final ByteArrayInputStream delegate;
+        private final AtomicBoolean aborted = new AtomicBoolean();
+        private final AtomicBoolean closed = new AtomicBoolean();
+        private volatile boolean abortedBeforeClose;
+
+        TrackingInputStream(byte[] data, int offset) {
+            this.delegate = new ByteArrayInputStream(data, offset, data.length 
- offset);
+        }
+
+        TrackingInputStream(byte[] data) {
+            this(data, 0);
+        }
+
+        @Override
+        public int read() {
+            return delegate.read();
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) {
+            return delegate.read(b, off, len);
+        }
+
+        @Override
+        public void abort() {
+            aborted.set(true);
+        }
+
+        @Override
+        public void close() throws IOException {
+            if (aborted.get()) {
+                abortedBeforeClose = true;
+            }
+            closed.set(true);
+            delegate.close();
+        }
+
+        boolean wasAborted() {
+            return aborted.get();
+        }
+
+        boolean wasClosed() {
+            return closed.get();
+        }
+
+        boolean wasAbortedBeforeClose() {
+            return abortedBeforeClose;
+        }
+    }
+
+    /** {@link S3Client} stub. */
+    private static final class StubS3Client implements S3Client {
+        private final byte[] data;
+        private final AtomicInteger getObjectCalls = new AtomicInteger();
+        private volatile TrackingInputStream lastStream;
+
+        StubS3Client(byte[] data) {
+            this.data = data;
+        }
+
+        @Override
+        public ResponseInputStream<GetObjectResponse> 
getObject(GetObjectRequest request) {
+            getObjectCalls.incrementAndGet();
+            int offset = 0;
+            String range = request.range();
+            if (range != null && range.startsWith("bytes=")) {
+                offset = Integer.parseInt(range.substring(6, 
range.indexOf('-')));
+            }
+            TrackingInputStream tracking = new TrackingInputStream(data, 
offset);
+            lastStream = tracking;
+            AbortableInputStream abortable = 
AbortableInputStream.create(tracking, tracking);
+            return new ResponseInputStream<>(
+                    GetObjectResponse.builder().build(), abortable, 
Duration.ZERO);
+        }
+
+        @Override
+        public String serviceName() {
+            return "s3";
+        }
+
+        @Override
+        public void close() {}
+
+        TrackingInputStream lastStream() {
+            return lastStream;
+        }
+
+        int getObjectCalls() {
+            return getObjectCalls.get();
+        }
+    }
+
+    @Test
+    void closeAbortsUnderlyingStream() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
+            assertThat(in.read()).isEqualTo(0);
+            assertThat(in.getPos()).isEqualTo(1);
+        }
+        assertThat(client.lastStream().wasAborted()).isTrue();
+    }
+
+    @Test
+    void closeAbortsAndThenClosesUnderlyingStream() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
+            in.read();
+        }
+        TrackingInputStream stream = client.lastStream();
+        // abort() must be called to kill the HTTP connection (prevents drain)
+        assertThat(stream.wasAborted()).isTrue();
+        // close() must still be called for SDK resource cleanup (connection 
pool return, etc.)
+        assertThat(stream.wasClosed()).isTrue();
+        // abort() must happen BEFORE close() - otherwise close() drains 
remaining bytes
+        assertThat(stream.wasAbortedBeforeClose()).isTrue();
+    }
+
+    @Test
+    void seekAbortsAndClosesOldStreamBeforeOpeningNew() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
+            in.read();
+            TrackingInputStream first = client.lastStream();
+
+            in.seek(100);
+
+            // old stream must be aborted, closed, and in the correct order
+            assertThat(first.wasAborted()).isTrue();
+            assertThat(first.wasClosed()).isTrue();
+            assertThat(first.wasAbortedBeforeClose()).isTrue();
+            assertThat(client.getObjectCalls()).isEqualTo(2);
+            assertThat(in.getPos()).isEqualTo(100);
+            in.seek(100);
+            assertThat(client.getObjectCalls()).isEqualTo(2);
+        }
+    }
+
+    @Test
+    void skipAbortsOldStreamAndOpensNew() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
+            in.read();
+            TrackingInputStream first = client.lastStream();
+            assertThat(in.skip(100)).isEqualTo(100);
+            assertThat(first.wasAborted()).isTrue();
+            assertThat(first.wasClosed()).isTrue();
+            assertThat(first.wasAbortedBeforeClose()).isTrue();
+            assertThat(client.getObjectCalls()).isEqualTo(2);
+            // skip(0) and skip(negative) are no-ops
+            assertThat(in.skip(0)).isZero();
+            assertThat(in.skip(-5)).isZero();
+            assertThat(client.getObjectCalls()).isEqualTo(2);
+        }
+    }
+
+    @Test
+    void closeWithoutReadNeverOpensStream() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
+            // lazy init means no getObject call
+        }
+        assertThat(client.getObjectCalls()).isEqualTo(0);
+    }
+
+    @Test
+    void doubleCloseIsIdempotent() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, KEY, 
DATA.length);
+        in.read();
+
+        // Verify state after first close
+        in.close();
+        assertThat(client.getObjectCalls()).isEqualTo(1);
+        assertThat(client.lastStream().wasAborted()).isTrue();
+
+        // Second close should be a no-op
+        in.close();
+        assertThat(client.getObjectCalls()).isEqualTo(1);
+        assertThat(client.lastStream().wasAborted()).isTrue();
+    }
+
+    @Test
+    void seekBeforeFirstReadUpdatesPositionOnly() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
+            in.seek(50);
+            assertThat(in.getPos()).isEqualTo(50);
+            assertThat(client.getObjectCalls()).isEqualTo(0);
+        }
+    }
+
+    @Test
+    void readAndSeekReturnCorrectData() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
+            // single-byte read
+            assertThat(in.read()).isEqualTo(0);
+            assertThat(in.getPos()).isEqualTo(1);
+            // bulk read returns correct bytes and advances position
+            byte[] buf = new byte[10];
+            assertThat(in.read(buf, 0, 10)).isEqualTo(10);
+            assertThat(in.getPos()).isEqualTo(11);
+            for (int i = 0; i < 10; i++) {
+                assertThat(buf[i]).isEqualTo(DATA[i + 1]);
+            }
+            // available() reflects remaining bytes
+            assertThat(in.available()).isEqualTo(DATA.length - 11);
+            // seek then read returns data at the seeked position
+            in.seek(200);
+            assertThat(in.read()).isEqualTo(200);
+            assertThat(in.getPos()).isEqualTo(201);
+            // partial read at EOF returns only remaining bytes
+            in.seek(250);
+            byte[] tail = new byte[20];
+            assertThat(in.read(tail, 0, 20)).isEqualTo(6);
+            assertThat(in.getPos()).isEqualTo(256);
+            // read past EOF
+            assertThat(in.read()).isEqualTo(-1);
+            assertThat(in.read(new byte[1], 0, 1)).isEqualTo(-1);
+        }
+    }
+
+    @Test
+    void seekPastEofThrowsEofException() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
+            assertThatThrownBy(() -> in.seek(DATA.length + 1))
+                    .isInstanceOf(EOFException.class)
+                    .hasMessageContaining("past end of stream");
+            in.seek(DATA.length);
+            assertThat(in.getPos()).isEqualTo(DATA.length);
+            assertThat(client.getObjectCalls()).isEqualTo(0);
+        }
+    }
+
+    @Test
+    void readAtEofReturnsMinusOne() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
+            in.seek(DATA.length);
+            assertThat(in.read()).isEqualTo(-1);
+            assertThat(in.read(new byte[8], 0, 8)).isEqualTo(-1);
+            assertThat(in.getPos()).isEqualTo(DATA.length);
+            assertThat(in.available()).isZero();
+            // EOF short-circuits before lazyInitialize, so no range request 
is issued.
+            assertThat(client.getObjectCalls()).isZero();
+        }
+    }
+
+    @Test
+    void seekToContentLengthWithOpenStreamReleasesStream() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
+            in.read();
+            TrackingInputStream first = client.lastStream();
+            assertThat(client.getObjectCalls()).isEqualTo(1);
+
+            in.seek(DATA.length);
+
+            assertThat(first.wasAborted()).isTrue();
+            assertThat(first.wasClosed()).isTrue();
+            assertThat(first.wasAbortedBeforeClose()).isTrue();
+            // bytes=contentLength- is unsatisfiable, so we must not reopen.
+            assertThat(client.getObjectCalls()).isEqualTo(1);
+            assertThat(in.read()).isEqualTo(-1);
+            assertThat(in.read(new byte[4], 0, 4)).isEqualTo(-1);
+            assertThat(client.getObjectCalls()).isEqualTo(1);
+        }
+    }
+
+    @Test
+    void skipToEofWithOpenStreamReleasesStream() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
+            in.read();
+            TrackingInputStream first = client.lastStream();
+            assertThat(client.getObjectCalls()).isEqualTo(1);
+
+            assertThat(in.skip(DATA.length)).isEqualTo(DATA.length - 1);
+            assertThat(in.getPos()).isEqualTo(DATA.length);
+
+            assertThat(first.wasAborted()).isTrue();
+            assertThat(first.wasClosed()).isTrue();
+            assertThat(first.wasAbortedBeforeClose()).isTrue();
+            assertThat(client.getObjectCalls()).isEqualTo(1);
+            assertThat(in.read()).isEqualTo(-1);
+        }
+    }
+
+    @Test
+    void rejectsInvalidArguments() throws Exception {
+        StubS3Client client = new StubS3Client(DATA);
+        try (NativeS3InputStream in = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length)) {
+            assertThatThrownBy(() -> in.read(null, 0, 
1)).isInstanceOf(NullPointerException.class);
+            assertThatThrownBy(() -> in.read(new byte[5], -1, 1))
+                    .isInstanceOf(IndexOutOfBoundsException.class);
+            assertThatThrownBy(() -> in.read(new byte[5], 0, 6))
+                    .isInstanceOf(IndexOutOfBoundsException.class);
+            assertThatThrownBy(() -> in.seek(-1))
+                    .isInstanceOf(EOFException.class)
+                    .hasMessageContaining("negative");
+            assertThat(in.read(new byte[5], 0, 0)).isZero();
+        }
+        NativeS3InputStream closed = new NativeS3InputStream(client, BUCKET, 
KEY, DATA.length);
+        closed.close();
+        assertThatThrownBy(closed::read).isInstanceOf(IOException.class);
+        assertThatThrownBy(() -> closed.read(new byte[1], 0, 
1)).isInstanceOf(IOException.class);
+        assertThatThrownBy(() -> 
closed.seek(0)).isInstanceOf(IOException.class);
+        assertThatThrownBy(closed::available).isInstanceOf(IOException.class);
+    }
+}

Reply via email to