[ 
https://issues.apache.org/jira/browse/FLINK-10963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701718#comment-16701718
 ] 

ASF GitHub Bot commented on FLINK-10963:
----------------------------------------

igalshilman commented on a change in pull request #7161: 
[FLINK-10963][fs-connector, s3] Cleanup tmp S3 objects uploaded as backups of 
in-progress files.
URL: https://github.com/apache/flink/pull/7161#discussion_r237021819
 
 

 ##########
 File path: 
flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
 ##########
 @@ -61,25 +64,50 @@ public HadoopS3MultiPartUploader(S3AFileSystem s3a, 
Configuration conf) {
 
        @Override
        public String startMultiPartUpload(String key) throws IOException {
-               return s3uploader.initiateMultiPartUpload(key);
+               return s3accessHelper.initiateMultiPartUpload(key);
        }
 
        @Override
        public UploadPartResult uploadPart(String key, String uploadId, int 
partNumber, InputStream inputStream, long length) throws IOException {
-               final UploadPartRequest uploadRequest = 
s3uploader.newUploadPartRequest(
+               final UploadPartRequest uploadRequest = 
s3accessHelper.newUploadPartRequest(
                                key, uploadId, partNumber, 
MathUtils.checkedDownCast(length), inputStream, null, 0L);
-               return s3uploader.uploadPart(uploadRequest);
+               return s3accessHelper.uploadPart(uploadRequest);
        }
 
        @Override
-       public PutObjectResult uploadIncompletePart(String key, InputStream 
inputStream, long length) throws IOException {
-               final PutObjectRequest putRequest = 
s3uploader.createPutObjectRequest(key, inputStream, length);
-               return s3uploader.putObject(putRequest);
+       public PutObjectResult putObject(String key, InputStream inputStream, 
long length) throws IOException {
+               final PutObjectRequest putRequest = 
s3accessHelper.createPutObjectRequest(key, inputStream, length);
+               return s3accessHelper.putObject(putRequest);
        }
 
        @Override
        public CompleteMultipartUploadResult commitMultiPartUpload(String 
destKey, String uploadId, List<PartETag> partETags, long length, AtomicInteger 
errorCount) throws IOException {
-               return s3uploader.completeMPUwithRetries(destKey, uploadId, 
partETags, length, errorCount);
+               return s3accessHelper.completeMPUwithRetries(destKey, uploadId, 
partETags, length, errorCount);
+       }
+
+       @Override
+       public boolean deleteObject(String key) throws IOException {
+               return s3a.delete(new org.apache.hadoop.fs.Path('/' + key), 
false);
+       }
+
+       @Override
+       public long getObject(String key, File targetLocation) throws 
IOException {
+               long numBytes = 0L;
+               try (
+                               final OutputStream outStream = new 
FileOutputStream(targetLocation);
+                               final org.apache.hadoop.fs.FSDataInputStream 
inStream =
+                                               s3a.open(new 
org.apache.hadoop.fs.Path('/' + key))
+               ) {
+                       final byte[] buffer = new byte[32 * 1024];
+
+                       int numRead;
+                       while ((numRead = inStream.read(buffer)) > 0) {
 
 Review comment:
   suggestion: `while ((..) != -1)`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Cleanup small objects uploaded to S3 as independent objects
> -----------------------------------------------------------
>
>                 Key: FLINK-10963
>                 URL: https://issues.apache.org/jira/browse/FLINK-10963
>             Project: Flink
>          Issue Type: Sub-task
>          Components: filesystem-connector
>    Affects Versions: 1.7.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.1
>
>
> The S3 {{RecoverableWriter}} uses the Multipart Upload (MPU) Feature of S3 in 
> order to upload the different part files. This means that a large part is 
> split in chunks of at least 5MB which are uploaded independently, whenever 
> each one of them is ready.
> This 5MB minimum size requires special handling of parts that are less than 
> 5MB when a checkpoint barrier arrives. These small files are uploaded as 
> independent objects (not associated with an active MPU). This way, when Flink 
> needs to restore, it simply downloads them and resumes writing to them.
> These small objects are currently not cleaned up, thus leading to wasted 
> space on S3.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to