[ https://issues.apache.org/jira/browse/HADOOP-19220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17864783#comment-17864783 ]
Steve Loughran commented on HADOOP-19220: ----------------------------------------- FWIW with the proposed move to async client everywhere, i want all s3client access to be hidden by code behind the s3A store API; this may assist mocking > S3A : S3AInputStream positioned readFully Expectation > ----------------------------------------------------- > > Key: HADOOP-19220 > URL: https://issues.apache.org/jira/browse/HADOOP-19220 > Project: Hadoop Common > Issue Type: Test > Components: fs/s3 > Reporter: Vinay Devadiga > Priority: Major > > So basically i was testing to write some unit test - for S3AInputStream > readFully Method > package org.apache.hadoop.fs.s3a; > import java.io.EOFException; > import java.io.FilterInputStream; > import java.io.IOException; > import java.io.InputStream; > import java.net.SocketException; > import java.net.URI; > import java.nio.ByteBuffer; > import java.nio.charset.Charset; > import java.nio.charset.StandardCharsets; > import java.util.concurrent.CompletableFuture; > import java.util.concurrent.TimeUnit; > import org.apache.commons.io.IOUtils; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.fs.Path; > import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan; > import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; > import org.apache.hadoop.util.BlockingThreadPoolExecutorService; > import org.apache.hadoop.util.functional.CallableRaisingIOE; > import org.assertj.core.api.Assertions; > import org.junit.Before; > import org.junit.Test; > import software.amazon.awssdk.awscore.exception.AwsErrorDetails; > import software.amazon.awssdk.awscore.exception.AwsServiceException; > import software.amazon.awssdk.core.ResponseInputStream; > import software.amazon.awssdk.http.AbortableInputStream; > import software.amazon.awssdk.services.s3.S3Client; > import software.amazon.awssdk.services.s3.model.GetObjectRequest; > import software.amazon.awssdk.services.s3.model.GetObjectResponse; > import static java.lang.Math.min; > import static java.nio.charset.StandardCharsets.UTF_8; > import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD; > import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION; > import static org.apache.hadoop.fs.s3a.Constants.FS_S3A; > import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE; > import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL; > import static org.apache.hadoop.util.functional.FutureIO.eval; > import static org.assertj.core.api.Assertions.assertThat; > import static > org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; > import static org.mockito.ArgumentMatchers.any; > import static org.mockito.Mockito.never; > import static org.mockito.Mockito.verify; > public class TestReadFullyAndPositionalRead { > private S3AFileSystem fs; > private S3AInputStream input; > private S3Client s3; > private static final String EMPTY = ""; > private static final String INPUT = "test_content"; > @Before > public void setUp() throws IOException { > Configuration conf = createConfiguration(); > fs = new S3AFileSystem(); > URI uri = URI.create(FS_S3A + "://" + MockS3AFileSystem.BUCKET); > // Unset S3CSE property from config to avoid pathIOE. > conf.unset(Constants.S3_ENCRYPTION_ALGORITHM); > fs.initialize(uri, conf); > s3 = fs.getS3AInternals().getAmazonS3Client("mocking"); > } > public Configuration createConfiguration() { > Configuration conf = new Configuration(); > conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class, > S3ClientFactory.class); > // use minimum multipart size for faster triggering > conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE); > conf.setInt(Constants.S3A_BUCKET_PROBE, 1); > // this is so stream draining is always blocking, allowing assertions > to be safely made without worrying about any race conditions > conf.setInt(ASYNC_DRAIN_THRESHOLD, Integer.MAX_VALUE); > // set the region to avoid the getBucketLocation on FS init. > conf.set(AWS_REGION, "eu-west-1"); > return conf; > } > @Test > public void testReadFullyFromBeginning() throws IOException { > input = getMockedS3AInputStream(INPUT); > byte[] byteArray = new byte[INPUT.length()]; > input.readFully(0, byteArray, 0, byteArray.length); > assertThat(new String(byteArray, UTF_8)).isEqualTo(INPUT); > } > @Test > public void testReadFullyWithOffsetAndLength() throws IOException { > input = getMockedS3AInputStream(INPUT); > byte[] byteArray = new byte[4]; > input.readFully(5, byteArray, 0, 4); > assertThat(new String(byteArray, UTF_8)).isEqualTo("cont"); > } > @Test > public void testReadFullyWithOffsetBeyondStream() throws IOException { > input = getMockedS3AInputStream(INPUT); > byte[] byteArray = new byte[10]; > assertThatExceptionOfType(EOFException.class) > .isThrownBy(() -> input.readFully(20, byteArray, 0, 10)); > } > private S3AInputStream getMockedS3AInputStream(String input) { > Path path = new Path("test-path"); > String eTag = "test-etag"; > String versionId = "test-version-id"; > String owner = "test-owner"; > S3AFileStatus s3AFileStatus = new S3AFileStatus(input.length(), 0, > path, input.length(), owner, eTag, versionId); > S3ObjectAttributes s3ObjectAttributes = new S3ObjectAttributes( > fs.getBucket(), path, fs.pathToKey(path), > fs.getS3EncryptionAlgorithm(), new EncryptionSecrets().getEncryptionKey(), > eTag, versionId, input.length()); > S3AReadOpContext s3AReadOpContext = > fs.createReadContext(s3AFileStatus, NoopSpan.INSTANCE); > return new S3AInputStream(s3AReadOpContext, s3ObjectAttributes, > getMockedInputStreamCallback(input), > s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(), > BlockingThreadPoolExecutorService.newInstance(2, 40, 60, TimeUnit.SECONDS, > "s3a-bounded")); > } > private S3AInputStream.InputStreamCallbacks > getMockedInputStreamCallback(String input) { > GetObjectResponse objectResponse = > GetObjectResponse.builder().eTag("test-etag").build(); > ResponseInputStream<GetObjectResponse>[] responseInputStreams = new > ResponseInputStream[] { > getMockedInputStream(objectResponse, true, input), > getMockedInputStream(objectResponse, true, input), > getMockedInputStream(objectResponse, false, input) > }; > return new S3AInputStream.InputStreamCallbacks() { > private Integer mockedS3ObjectIndex = 0; > @Override > public ResponseInputStream<GetObjectResponse> > getObject(GetObjectRequest request) { > mockedS3ObjectIndex++; > if (mockedS3ObjectIndex == 3) { > throw AwsServiceException.builder() > .message("Failed to get S3Object") > > .awsErrorDetails(AwsErrorDetails.builder().errorCode("test-code").build()) > .build(); > } > return responseInputStreams[min(mockedS3ObjectIndex, > responseInputStreams.length) - 1]; > } > @Override > public GetObjectRequest.Builder newGetRequestBuilder(String key) { > return > GetObjectRequest.builder().bucket(fs.getBucket()).key(key); > } > @Override > public <T> CompletableFuture<T> submit(final > CallableRaisingIOE<T> task) { > return eval(task); > } > @Override > public void close() { > } > }; > } > private ResponseInputStream<GetObjectResponse> getMockedInputStream( > GetObjectResponse response, boolean success, String input) { > FilterInputStream stream = new > FilterInputStream(AbortableInputStream.create( > IOUtils.toInputStream(input, StandardCharsets.UTF_8), () -> { > })) { > @Override > public void close() throws IOException { > super.close(); > if (!success) { > throw new SocketException("Socket closed"); > } > } > }; > return new ResponseInputStream<>(response, stream); > } > } > Now this - > [ERROR] > TestReadFullyAndPositionalRead.testPositionalReadWithOffsetAndLength:136 > expected:<"[con]t"> but was:<"[tes]t"> > is the failure its not adhering to the position parameter and reading the > inital bytes only > What is the expectation of the readFully Function in S3AInputStream? -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org