[
https://issues.apache.org/jira/browse/APEXMALHAR-2174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15407244#comment-15407244
]
ASF GitHub Bot commented on APEXMALHAR-2174:
--------------------------------------------
Github user yogidevendra commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/360#discussion_r73466833
--- Diff:
library/src/main/java/com/datatorrent/lib/io/fs/S3BlockReader.java ---
@@ -47,63 +56,144 @@ public S3BlockReader()
public void setup(Context.OperatorContext context)
{
super.setup(context);
- s3bucketUri = fs.getScheme() + "://" + bucketName;
+ s3Client = new AmazonS3Client(new BasicAWSCredentials(accessKey,
secretAccessKey));
+ ((S3BlockReaderContext)readerContext).setBucketName(bucketName);
+ ((S3BlockReaderContext)readerContext).setS3Client(s3Client);
}
/**
* Extracts the bucket name from the given uri
* @param s3uri s3 uri
* @return name of the bucket
*/
- @VisibleForTesting
protected static String extractBucket(String s3uri)
{
return s3uri.substring(s3uri.indexOf('@') + 1, s3uri.indexOf("/",
s3uri.indexOf('@')));
}
/**
- * Create the stream from the bucket uri and block path.
+ * Extracts the accessKey from the given uri
+ * @param s3uri given s3 uri
+ * @return the accessKey
+ */
+ protected static String extractAccessKey(String s3uri)
+ {
+ return s3uri.substring(s3uri.indexOf("://") + 3, s3uri.indexOf(':',
s3uri.indexOf("://") + 3));
+ }
+
+ /**
+ * Extracts the secretAccessKey from the given uri
+ * @param s3uri given s3uri
+ * @return the secretAccessKey
+ */
+ protected static String extractSecretAccessKey(String s3uri)
+ {
+ return s3uri.substring(s3uri.indexOf(':', s3uri.indexOf("://") + 1) +
1, s3uri.indexOf('@'));
+ }
+
+ /**
+ * Extract the file path from given block and set it to the readerContext
* @param block block metadata
* @return stream
* @throws IOException
*/
@Override
protected FSDataInputStream setupStream(BlockMetadata.FileBlockMetadata
block) throws IOException
{
- FSDataInputStream ins = fs.open(new Path(s3bucketUri +
block.getFilePath()));
- ins.seek(block.getOffset());
- return ins;
+ String filePath = block.getFilePath();
+ if (filePath.startsWith("/")) {
--- End diff --
Please add comments mentioning why this is needed.
> S3 File Reader reading more data than expected
> ----------------------------------------------
>
> Key: APEXMALHAR-2174
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2174
> Project: Apache Apex Malhar
> Issue Type: Bug
> Reporter: Chaitanya
> Assignee: Chaitanya
>
> This is observed through the AWS billing.
> Issue might be the S3InputStream.read() which is used in readEntity().
> Reading the block can be achieved through the AmazonS3 api's. So, I am
> proposing the following solution:
> ```
> GetObjectRequest rangeObjectRequest = new GetObjectRequest(
> bucketName, key);
> rangeObjectRequest.setRange(startByte, noOfBytes);
> S3Object objectPortion = s3Client.getObject(rangeObjectRequest);
> S3ObjectInputStream wrappedStream = objectPortion.getObjectContent();
> byte[] record = ByteStreams.toByteArray(wrappedStream);
> Advantages of this solution: Parallel read will work for all types of s3 file
> systems.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)