Vinay Devadiga created HADOOP-19220:
---------------------------------------
Summary: S3A : S3AInputStream positioned readFully Expectation
Key: HADOOP-19220
URL: https://issues.apache.org/jira/browse/HADOOP-19220
Project: Hadoop Common
Issue Type: Bug
Components: fs/s3
Reporter: Vinay Devadiga
So basically i was testing to write some unit test - for S3AInputStream
readFully Method
package org.apache.hadoop.fs.s3a;
import java.io.EOFException;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import static java.lang.Math.min;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL;
import static org.apache.hadoop.util.functional.FutureIO.eval;
import static org.assertj.core.api.Assertions.assertThat;
import static
org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
public class TestReadFullyAndPositionalRead {
private S3AFileSystem fs;
private S3AInputStream input;
private S3Client s3;
private static final String EMPTY = "";
private static final String INPUT = "test_content";
@Before
public void setUp() throws IOException {
Configuration conf = createConfiguration();
fs = new S3AFileSystem();
URI uri = URI.create(FS_S3A + "://" + MockS3AFileSystem.BUCKET);
// Unset S3CSE property from config to avoid pathIOE.
conf.unset(Constants.S3_ENCRYPTION_ALGORITHM);
fs.initialize(uri, conf);
s3 = fs.getS3AInternals().getAmazonS3Client("mocking");
}
public Configuration createConfiguration() {
Configuration conf = new Configuration();
conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class,
S3ClientFactory.class);
// use minimum multipart size for faster triggering
conf.setLong(Constants.MULTIPART_SIZE, MULTIPART_MIN_SIZE);
conf.setInt(Constants.S3A_BUCKET_PROBE, 1);
// this is so stream draining is always blocking, allowing assertions
to be safely made without worrying about any race conditions
conf.setInt(ASYNC_DRAIN_THRESHOLD, Integer.MAX_VALUE);
// set the region to avoid the getBucketLocation on FS init.
conf.set(AWS_REGION, "eu-west-1");
return conf;
}
@Test
public void testReadFullyFromBeginning() throws IOException {
input = getMockedS3AInputStream(INPUT);
byte[] byteArray = new byte[INPUT.length()];
input.readFully(0, byteArray, 0, byteArray.length);
assertThat(new String(byteArray, UTF_8)).isEqualTo(INPUT);
}
@Test
public void testReadFullyWithOffsetAndLength() throws IOException {
input = getMockedS3AInputStream(INPUT);
byte[] byteArray = new byte[4];
input.readFully(5, byteArray, 0, 4);
assertThat(new String(byteArray, UTF_8)).isEqualTo("cont");
}
@Test
public void testReadFullyWithOffsetBeyondStream() throws IOException {
input = getMockedS3AInputStream(INPUT);
byte[] byteArray = new byte[10];
assertThatExceptionOfType(EOFException.class)
.isThrownBy(() -> input.readFully(20, byteArray, 0, 10));
}
private S3AInputStream getMockedS3AInputStream(String input) {
Path path = new Path("test-path");
String eTag = "test-etag";
String versionId = "test-version-id";
String owner = "test-owner";
S3AFileStatus s3AFileStatus = new S3AFileStatus(input.length(), 0,
path, input.length(), owner, eTag, versionId);
S3ObjectAttributes s3ObjectAttributes = new S3ObjectAttributes(
fs.getBucket(), path, fs.pathToKey(path),
fs.getS3EncryptionAlgorithm(), new EncryptionSecrets().getEncryptionKey(),
eTag, versionId, input.length());
S3AReadOpContext s3AReadOpContext = fs.createReadContext(s3AFileStatus,
NoopSpan.INSTANCE);
return new S3AInputStream(s3AReadOpContext, s3ObjectAttributes,
getMockedInputStreamCallback(input),
s3AReadOpContext.getS3AStatisticsContext().newInputStreamStatistics(),
BlockingThreadPoolExecutorService.newInstance(2, 40, 60, TimeUnit.SECONDS,
"s3a-bounded"));
}
private S3AInputStream.InputStreamCallbacks
getMockedInputStreamCallback(String input) {
GetObjectResponse objectResponse =
GetObjectResponse.builder().eTag("test-etag").build();
ResponseInputStream<GetObjectResponse>[] responseInputStreams = new
ResponseInputStream[] {
getMockedInputStream(objectResponse, true, input),
getMockedInputStream(objectResponse, true, input),
getMockedInputStream(objectResponse, false, input)
};
return new S3AInputStream.InputStreamCallbacks() {
private Integer mockedS3ObjectIndex = 0;
@Override
public ResponseInputStream<GetObjectResponse>
getObject(GetObjectRequest request) {
mockedS3ObjectIndex++;
if (mockedS3ObjectIndex == 3) {
throw AwsServiceException.builder()
.message("Failed to get S3Object")
.awsErrorDetails(AwsErrorDetails.builder().errorCode("test-code").build())
.build();
}
return responseInputStreams[min(mockedS3ObjectIndex,
responseInputStreams.length) - 1];
}
@Override
public GetObjectRequest.Builder newGetRequestBuilder(String key) {
return
GetObjectRequest.builder().bucket(fs.getBucket()).key(key);
}
@Override
public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T>
task) {
return eval(task);
}
@Override
public void close() {
}
};
}
private ResponseInputStream<GetObjectResponse> getMockedInputStream(
GetObjectResponse response, boolean success, String input) {
FilterInputStream stream = new
FilterInputStream(AbortableInputStream.create(
IOUtils.toInputStream(input, StandardCharsets.UTF_8), () -> {
})) {
@Override
public void close() throws IOException {
super.close();
if (!success) {
throw new SocketException("Socket closed");
}
}
};
return new ResponseInputStream<>(response, stream);
}
}
Now this -
[ERROR]
TestReadFullyAndPositionalRead.testPositionalReadWithOffsetAndLength:136
expected:<"[con]t"> but was:<"[tes]t">
is the failure its not adhering to the position parameter and reading the
inital bytes only
What is the expectation of the readFully Function in S3AInputStream?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]