[ 
https://issues.apache.org/jira/browse/HADOOP-19220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steve Loughran resolved HADOOP-19220.
-------------------------------------
    Resolution: Works for Me

it works for me and for the people who support calls would ruin my life if it 
didn't work for them. 

You have probably done something with your mocking test set up that does not 
match what s3afs does. My recommendation is: step through the failing test with 
a debugger. 

I'm not going to look at the code because the way to do anything like that 
would be to share it as a github reference. But anyway, not a jira class issue 
- Not yet, anyway. This is the kind of problem to raise on the developer 
mailing list.

For that reason, I'm going to close it as a WORKSFORME. sorry. stick the code 
on github as a gist or something and discusson the hadoop developer list. if it 
really is a bug in s3a fs code, this jira can be re-opened.

> S3A : S3AInputStream positioned readFully Expectation
> -----------------------------------------------------
>
>                 Key: HADOOP-19220
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19220
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: fs/s3
>            Reporter: Vinay Devadiga
>            Priority: Major
>
> So basically i was testing to write some unit test - for S3AInputStream 
> readFully Method 
> package org.apache.hadoop.fs.s3a;
> import java.io.EOFException;
> import java.io.FilterInputStream;
> import java.io.IOException;
> import java.io.InputStream;
> import java.net.SocketException;
> import java.net.URI;
> import java.nio.ByteBuffer;
> import java.nio.charset.Charset;
> import java.nio.charset.StandardCharsets;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import org.apache.commons.io.IOUtils;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan;
> import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
> import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
> import org.apache.hadoop.util.functional.CallableRaisingIOE;
> import org.assertj.core.api.Assertions;
> import org.junit.Before;
> import org.junit.Test;
> import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
> import software.amazon.awssdk.awscore.exception.AwsServiceException;
> import software.amazon.awssdk.core.ResponseInputStream;
> import software.amazon.awssdk.http.AbortableInputStream;
> import software.amazon.awssdk.services.s3.S3Client;
> import software.amazon.awssdk.services.s3.model.GetObjectRequest;
> import software.amazon.awssdk.services.s3.model.GetObjectResponse;
> import static java.lang.Math.min;
> import static java.nio.charset.StandardCharsets.UTF_8;
> import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
> import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
> import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
> import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
> import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL;
> import static org.apache.hadoop.util.functional.FutureIO.eval;
> import static org.assertj.core.api.Assertions.assertThat;
> import static 
> org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
> import static org.mockito.ArgumentMatchers.any;
> import static org.mockito.Mockito.never;
> import static org.mockito.Mockito.verify;
> public class TestReadFullyAndPositionalRead {
>     private S3AFileSystem fs;
>     private S3AInputStream input;
>     private S3Client s3;
>     private static final String EMPTY = "";
>     private static final String INPUT = "test_content";
>     @Before
>     public void setUp() throws IOException {
>         Configuration conf = createConfiguration();
>         fs = new S3AFileSystem();
>         URI uri = URI.create(FS_S3A + "://" + MockS3AFileSystem.BUCKET);
>         // Unset S3CSE property from config to avoid pathIOE.
>         conf.unset(Constants.S3_ENCRYPTION_ALGORITHM);
>         fs.initialize(uri, conf);
>         s3 = fs.getS3AInternals().getAmazonS3Client("mocking");
>     }
>     public Configuration createConfiguration() {
>         Configuration conf = new Configuration();
>         conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class, 
> S3ClientFactory.class);
>         // use minimum multipart size for faster triggering
>         conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE);
>         conf.setInt(Constants.S3A_BUCKET_PROBE, 1);
>         // this is so stream draining is always blocking, allowing assertions 
> to be safely made without worrying about any race conditions
>         conf.setInt(ASYNC_DRAIN_THRESHOLD, Integer.MAX_VALUE);
>         // set the region to avoid the getBucketLocation on FS init.
>         conf.set(AWS_REGION, "eu-west-1");
>         return conf;
>     }
>     @Test
>     public void testReadFullyFromBeginning() throws IOException {
>         input = getMockedS3AInputStream(INPUT);
>         byte[] byteArray = new byte[INPUT.length()];
>         input.readFully(0, byteArray, 0, byteArray.length);
>         assertThat(new String(byteArray, UTF_8)).isEqualTo(INPUT);
>     }
>     @Test
>     public void testReadFullyWithOffsetAndLength() throws IOException {
>         input = getMockedS3AInputStream(INPUT);
>         byte[] byteArray = new byte[4];
>         input.readFully(5, byteArray, 0, 4);
>         assertThat(new String(byteArray, UTF_8)).isEqualTo("cont");
>     }
>     @Test
>     public void testReadFullyWithOffsetBeyondStream() throws IOException {
>         input = getMockedS3AInputStream(INPUT);
>         byte[] byteArray = new byte[10];
>         assertThatExceptionOfType(EOFException.class)
>                 .isThrownBy(() -> input.readFully(20, byteArray, 0, 10));
>     }
>     private S3AInputStream getMockedS3AInputStream(String input) {
>         Path path = new Path("test-path");
>         String eTag = "test-etag";
>         String versionId = "test-version-id";
>         String owner = "test-owner";
>         S3AFileStatus s3AFileStatus = new S3AFileStatus(input.length(), 0, 
> path, input.length(), owner, eTag, versionId);
>         S3ObjectAttributes s3ObjectAttributes = new S3ObjectAttributes(
>                 fs.getBucket(), path, fs.pathToKey(path), 
> fs.getS3EncryptionAlgorithm(), new EncryptionSecrets().getEncryptionKey(), 
> eTag, versionId, input.length());
>         S3AReadOpContext s3AReadOpContext = 
> fs.createReadContext(s3AFileStatus, NoopSpan.INSTANCE);
>         return new S3AInputStream(s3AReadOpContext, s3ObjectAttributes, 
> getMockedInputStreamCallback(input), 
> s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(), 
> BlockingThreadPoolExecutorService.newInstance(2, 40, 60, TimeUnit.SECONDS, 
> "s3a-bounded"));
>     }
>     private S3AInputStream.InputStreamCallbacks 
> getMockedInputStreamCallback(String input) {
>         GetObjectResponse objectResponse = 
> GetObjectResponse.builder().eTag("test-etag").build();
>         ResponseInputStream<GetObjectResponse>[] responseInputStreams = new 
> ResponseInputStream[] {
>                 getMockedInputStream(objectResponse, true, input),
>                 getMockedInputStream(objectResponse, true, input),
>                 getMockedInputStream(objectResponse, false, input)
>         };
>         return new S3AInputStream.InputStreamCallbacks() {
>             private Integer mockedS3ObjectIndex = 0;
>             @Override
>             public ResponseInputStream<GetObjectResponse> 
> getObject(GetObjectRequest request) {
>                 mockedS3ObjectIndex++;
>                 if (mockedS3ObjectIndex == 3) {
>                     throw AwsServiceException.builder()
>                             .message("Failed to get S3Object")
>                             
> .awsErrorDetails(AwsErrorDetails.builder().errorCode("test-code").build())
>                             .build();
>                 }
>                 return responseInputStreams[min(mockedS3ObjectIndex, 
> responseInputStreams.length) - 1];
>             }
>             @Override
>             public GetObjectRequest.Builder newGetRequestBuilder(String key) {
>                 return 
> GetObjectRequest.builder().bucket(fs.getBucket()).key(key);
>             }
>             @Override
>             public <T> CompletableFuture<T> submit(final 
> CallableRaisingIOE<T> task) {
>                 return eval(task);
>             }
>             @Override
>             public void close() {
>             }
>         };
>     }
>     private ResponseInputStream<GetObjectResponse> getMockedInputStream(
>             GetObjectResponse response, boolean success, String input) {
>         FilterInputStream stream = new 
> FilterInputStream(AbortableInputStream.create(
>                 IOUtils.toInputStream(input, StandardCharsets.UTF_8), () -> {
>                 })) {
>             @Override
>             public void close() throws IOException {
>                 super.close();
>                 if (!success) {
>                     throw new SocketException("Socket closed");
>                 }
>             }
>         };
>         return new ResponseInputStream<>(response, stream);
>     }
> }
> Now this -
> [ERROR]   
> TestReadFullyAndPositionalRead.testPositionalReadWithOffsetAndLength:136 
> expected:<"[con]t"> but was:<"[tes]t">
> is the failure its not adhering to the position parameter and reading the 
> inital bytes only
> What is the expectation of the readFully Function in S3AInputStream?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-dev-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-dev-h...@hadoop.apache.org

Reply via email to