[ 
https://issues.apache.org/jira/browse/HADOOP-19220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864783#comment-17864783
 ] 

Steve Loughran commented on HADOOP-19220:
-----------------------------------------

FWIW with the proposed move to async client everywhere, i want all s3client 
access to be hidden by code behind the s3A store API; this may assist mocking

> S3A : S3AInputStream positioned readFully Expectation
> -----------------------------------------------------
>
>                 Key: HADOOP-19220
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19220
>             Project: Hadoop Common
>          Issue Type: Test
>          Components: fs/s3
>            Reporter: Vinay Devadiga
>            Priority: Major
>
> So basically i was testing to write some unit test - for S3AInputStream 
> readFully Method 
> package org.apache.hadoop.fs.s3a;
> import java.io.EOFException;
> import java.io.FilterInputStream;
> import java.io.IOException;
> import java.io.InputStream;
> import java.net.SocketException;
> import java.net.URI;
> import java.nio.ByteBuffer;
> import java.nio.charset.Charset;
> import java.nio.charset.StandardCharsets;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import org.apache.commons.io.IOUtils;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan;
> import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
> import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
> import org.apache.hadoop.util.functional.CallableRaisingIOE;
> import org.assertj.core.api.Assertions;
> import org.junit.Before;
> import org.junit.Test;
> import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
> import software.amazon.awssdk.awscore.exception.AwsServiceException;
> import software.amazon.awssdk.core.ResponseInputStream;
> import software.amazon.awssdk.http.AbortableInputStream;
> import software.amazon.awssdk.services.s3.S3Client;
> import software.amazon.awssdk.services.s3.model.GetObjectRequest;
> import software.amazon.awssdk.services.s3.model.GetObjectResponse;
> import static java.lang.Math.min;
> import static java.nio.charset.StandardCharsets.UTF_8;
> import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
> import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
> import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
> import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
> import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL;
> import static org.apache.hadoop.util.functional.FutureIO.eval;
> import static org.assertj.core.api.Assertions.assertThat;
> import static 
> org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
> import static org.mockito.ArgumentMatchers.any;
> import static org.mockito.Mockito.never;
> import static org.mockito.Mockito.verify;
> public class TestReadFullyAndPositionalRead {
>     private S3AFileSystem fs;
>     private S3AInputStream input;
>     private S3Client s3;
>     private static final String EMPTY = "";
>     private static final String INPUT = "test_content";
>     @Before
>     public void setUp() throws IOException {
>         Configuration conf = createConfiguration();
>         fs = new S3AFileSystem();
>         URI uri = URI.create(FS_S3A + "://" + MockS3AFileSystem.BUCKET);
>         // Unset S3CSE property from config to avoid pathIOE.
>         conf.unset(Constants.S3_ENCRYPTION_ALGORITHM);
>         fs.initialize(uri, conf);
>         s3 = fs.getS3AInternals().getAmazonS3Client("mocking");
>     }
>     public Configuration createConfiguration() {
>         Configuration conf = new Configuration();
>         conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class, 
> S3ClientFactory.class);
>         // use minimum multipart size for faster triggering
>         conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE);
>         conf.setInt(Constants.S3A_BUCKET_PROBE, 1);
>         // this is so stream draining is always blocking, allowing assertions 
> to be safely made without worrying about any race conditions
>         conf.setInt(ASYNC_DRAIN_THRESHOLD, Integer.MAX_VALUE);
>         // set the region to avoid the getBucketLocation on FS init.
>         conf.set(AWS_REGION, "eu-west-1");
>         return conf;
>     }
>     @Test
>     public void testReadFullyFromBeginning() throws IOException {
>         input = getMockedS3AInputStream(INPUT);
>         byte[] byteArray = new byte[INPUT.length()];
>         input.readFully(0, byteArray, 0, byteArray.length);
>         assertThat(new String(byteArray, UTF_8)).isEqualTo(INPUT);
>     }
>     @Test
>     public void testReadFullyWithOffsetAndLength() throws IOException {
>         input = getMockedS3AInputStream(INPUT);
>         byte[] byteArray = new byte[4];
>         input.readFully(5, byteArray, 0, 4);
>         assertThat(new String(byteArray, UTF_8)).isEqualTo("cont");
>     }
>     @Test
>     public void testReadFullyWithOffsetBeyondStream() throws IOException {
>         input = getMockedS3AInputStream(INPUT);
>         byte[] byteArray = new byte[10];
>         assertThatExceptionOfType(EOFException.class)
>                 .isThrownBy(() -> input.readFully(20, byteArray, 0, 10));
>     }
>     private S3AInputStream getMockedS3AInputStream(String input) {
>         Path path = new Path("test-path");
>         String eTag = "test-etag";
>         String versionId = "test-version-id";
>         String owner = "test-owner";
>         S3AFileStatus s3AFileStatus = new S3AFileStatus(input.length(), 0, 
> path, input.length(), owner, eTag, versionId);
>         S3ObjectAttributes s3ObjectAttributes = new S3ObjectAttributes(
>                 fs.getBucket(), path, fs.pathToKey(path), 
> fs.getS3EncryptionAlgorithm(), new EncryptionSecrets().getEncryptionKey(), 
> eTag, versionId, input.length());
>         S3AReadOpContext s3AReadOpContext = 
> fs.createReadContext(s3AFileStatus, NoopSpan.INSTANCE);
>         return new S3AInputStream(s3AReadOpContext, s3ObjectAttributes, 
> getMockedInputStreamCallback(input), 
> s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(), 
> BlockingThreadPoolExecutorService.newInstance(2, 40, 60, TimeUnit.SECONDS, 
> "s3a-bounded"));
>     }
>     private S3AInputStream.InputStreamCallbacks 
> getMockedInputStreamCallback(String input) {
>         GetObjectResponse objectResponse = 
> GetObjectResponse.builder().eTag("test-etag").build();
>         ResponseInputStream<GetObjectResponse>[] responseInputStreams = new 
> ResponseInputStream[] {
>                 getMockedInputStream(objectResponse, true, input),
>                 getMockedInputStream(objectResponse, true, input),
>                 getMockedInputStream(objectResponse, false, input)
>         };
>         return new S3AInputStream.InputStreamCallbacks() {
>             private Integer mockedS3ObjectIndex = 0;
>             @Override
>             public ResponseInputStream<GetObjectResponse> 
> getObject(GetObjectRequest request) {
>                 mockedS3ObjectIndex++;
>                 if (mockedS3ObjectIndex == 3) {
>                     throw AwsServiceException.builder()
>                             .message("Failed to get S3Object")
>                             
> .awsErrorDetails(AwsErrorDetails.builder().errorCode("test-code").build())
>                             .build();
>                 }
>                 return responseInputStreams[min(mockedS3ObjectIndex, 
> responseInputStreams.length) - 1];
>             }
>             @Override
>             public GetObjectRequest.Builder newGetRequestBuilder(String key) {
>                 return 
> GetObjectRequest.builder().bucket(fs.getBucket()).key(key);
>             }
>             @Override
>             public <T> CompletableFuture<T> submit(final 
> CallableRaisingIOE<T> task) {
>                 return eval(task);
>             }
>             @Override
>             public void close() {
>             }
>         };
>     }
>     private ResponseInputStream<GetObjectResponse> getMockedInputStream(
>             GetObjectResponse response, boolean success, String input) {
>         FilterInputStream stream = new 
> FilterInputStream(AbortableInputStream.create(
>                 IOUtils.toInputStream(input, StandardCharsets.UTF_8), () -> {
>                 })) {
>             @Override
>             public void close() throws IOException {
>                 super.close();
>                 if (!success) {
>                     throw new SocketException("Socket closed");
>                 }
>             }
>         };
>         return new ResponseInputStream<>(response, stream);
>     }
> }
> Now this -
> [ERROR]   
> TestReadFullyAndPositionalRead.testPositionalReadWithOffsetAndLength:136 
> expected:<"[con]t"> but was:<"[tes]t">
> is the failure its not adhering to the position parameter and reading the 
> inital bytes only
> What is the expectation of the readFully Function in S3AInputStream?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to