Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/132#discussion_r45944159
  
    --- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
 ---
    @@ -143,23 +316,223 @@ public void process(final InputStream rawIn) throws 
IOException {
                                 objectMetadata.setUserMetadata(userMetadata);
                             }
     
    -                        final PutObjectRequest request = new 
PutObjectRequest(bucket, key, in, objectMetadata);
    -                        
request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
    -                        final AccessControlList acl = createACL(context, 
ff);
    -                        if (acl != null) {
    -                            request.setAccessControlList(acl);
    -                        }
    +                        if (ff.getSize() <= multipartThreshold) {
    +                            //----------------------------------------
    +                            // single part upload
    +                            //----------------------------------------
    +                            final PutObjectRequest request = new 
PutObjectRequest(bucket, key, in, objectMetadata);
    +                            request.setStorageClass(
    +                                    
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
    +                            final AccessControlList acl = 
createACL(context, ff);
    +                            if (acl != null) {
    +                                request.setAccessControlList(acl);
    +                            }
     
    -                        final PutObjectResult result = 
s3.putObject(request);
    -                        if (result.getVersionId() != null) {
    -                            attributes.put("s3.version", 
result.getVersionId());
    -                        }
    +                            try {
    +                                final PutObjectResult result = 
s3.putObject(request);
    +                                if (result.getVersionId() != null) {
    +                                    attributes.put(S3_VERSION_ATTR_KEY, 
result.getVersionId());
    +                                }
    +                                if (result.getETag() != null) {
    +                                    attributes.put(S3_ETAG_ATTR_KEY, 
result.getETag());
    +                                }
    +                                if (result.getExpirationTime() != null) {
    +                                    attributes.put(S3_EXPIRATION_ATTR_KEY, 
result.getExpirationTime().toString());
    +                                }
    +                                if 
(result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY))
 {
    +                                    
attributes.put(S3_STORAGECLASS_ATTR_KEY,
    +                                            
result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString());
    +                                }
    +                                if (userMetadata.size() > 0) {
    +                                    StringBuilder userMetaBldr = new 
StringBuilder();
    +                                    for (String userKey : 
userMetadata.keySet()) {
    +                                        
userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey));
    +                                    }
    +                                    attributes.put(S3_USERMETA_ATTR_KEY, 
userMetaBldr.toString());
    +                                }
    +                            } catch (AmazonClientException e) {
    +                                getLogger().info("Failure completing 
upload flowfile={} bucket={} key={} reason={}",
    +                                        new Object[]{ffFilename, bucket, 
key, e.getMessage()});
    +                                throw (e);
    +                            }
    +                        } else {
    +                            //----------------------------------------
    +                            // multippart upload
    +                            //----------------------------------------
     
    -                        attributes.put("s3.etag", result.getETag());
    +                            // load or create persistent state
    +                            
//------------------------------------------------------------
    +                            MultipartState currentState;
    +                            try {
    +                                currentState = getState(cacheKey);
    +                                if (currentState != null) {
    +                                    if (currentState.getPartETags().size() 
> 0) {
    +                                        final PartETag lastETag = 
currentState.getPartETags().get(
    +                                                
currentState.getPartETags().size() - 1);
    +                                        getLogger().info("Resuming upload 
for flowfile='{}' bucket='{}' key='{}' " +
    +                                                "uploadID='{}' 
filePosition='{}' partSize='{}' storageClass='{}' " +
    +                                                "contentLength='{}' 
partsLoaded={} lastPart={}/{}",
    +                                                new Object[]{ffFilename, 
bucket, key, currentState.getUploadId(),
    +                                                        
currentState.getFilePosition(), currentState.getPartSize(),
    +                                                        
currentState.getStorageClass().toString(),
    +                                                        
currentState.getContentLength(),
    +                                                        
currentState.getPartETags().size(),
    +                                                        
Integer.toString(lastETag.getPartNumber()),
    +                                                        
lastETag.getETag()});
    +                                    } else {
    +                                        getLogger().info("Resuming upload 
for flowfile='{}' bucket='{}' key='{}' " +
    +                                                "uploadID='{}' 
filePosition='{}' partSize='{}' storageClass='{}' " +
    +                                                "contentLength='{}' no 
partsLoaded",
    +                                                new Object[]{ffFilename, 
bucket, key, currentState.getUploadId(),
    +                                                        
currentState.getFilePosition(), currentState.getPartSize(),
    +                                                        
currentState.getStorageClass().toString(),
    +                                                        
currentState.getContentLength()});
    +                                    }
    +                                } else {
    +                                    currentState = new MultipartState();
    +                                    
currentState.setPartSize(multipartPartSize);
    +                                    currentState.setStorageClass(
    +                                            
StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue()));
    +                                    
currentState.setContentLength(ff.getSize());
    +                                    persistState(cacheKey, currentState);
    +                                    getLogger().info("Starting new upload 
for flowfile='{}' bucket='{}' key='{}'",
    +                                            new Object[]{ffFilename, 
bucket, key});
    +                                }
    +                            } catch (IOException e) {
    +                                getLogger().error("IOException initiating 
cache state while processing flow files: " +
    +                                        e.getMessage());
    +                                throw (e);
    +                            }
    +
    +                            // initiate multipart upload or find position 
in file
    +                            
//------------------------------------------------------------
    +                            if (currentState.getUploadId().isEmpty()) {
    +                                final InitiateMultipartUploadRequest 
initiateRequest =
    +                                        new 
InitiateMultipartUploadRequest(bucket, key, objectMetadata);
    +                                
initiateRequest.setStorageClass(currentState.getStorageClass());
    +                                final AccessControlList acl = 
createACL(context, ff);
    +                                if (acl != null) {
    +                                    
initiateRequest.setAccessControlList(acl);
    +                                }
    +                                try {
    +                                    final InitiateMultipartUploadResult 
initiateResult =
    +                                            
s3.initiateMultipartUpload(initiateRequest);
    +                                    
currentState.setUploadId(initiateResult.getUploadId());
    +                                    currentState.getPartETags().clear();
    +                                    try {
    +                                        persistState(cacheKey, 
currentState);
    +                                    } catch (Exception e) {
    +                                        getLogger().info("Exception saving 
cache state while processing flow file: " +
    +                                                e.getMessage());
    +                                        throw(new 
ProcessException("Exception saving cache state", e));
    +                                    }
    +                                    getLogger().info("Success initiating 
upload flowfile={} available={} position={} " +
    +                                            "length={} bucket={} key={} 
uploadId={}",
    +                                            new Object[]{ffFilename, 
in.available(), currentState.getFilePosition(),
    +                                                    
currentState.getContentLength(), bucket, key,
    +                                                    
currentState.getUploadId()});
    +                                    if (initiateResult.getUploadId() != 
null) {
    +                                        
attributes.put(S3_UPLOAD_ID_ATTR_KEY, initiateResult.getUploadId());
    +                                    }
    +                                } catch (AmazonClientException e) {
    +                                    getLogger().info("Failure initiating 
upload flowfile={} bucket={} key={} reason={}",
    +                                            new Object[]{ffFilename, 
bucket, key, e.getMessage()});
    +                                    throw(e);
    +                                }
    +                            } else {
    +                                if (currentState.getFilePosition() > 0) {
    +                                    try {
    +                                        final long skipped = 
in.skip(currentState.getFilePosition());
    +                                        if (skipped != 
currentState.getFilePosition()) {
    +                                            getLogger().info("Failure 
skipping to resume upload flowfile={} " +
    +                                                    "bucket={} key={} 
position={} skipped={}",
    +                                                    new 
Object[]{ffFilename, bucket, key,
    +                                                            
currentState.getFilePosition(), skipped});
    +                                        }
    +                                    } catch (Exception e) {
    +                                        getLogger().info("Failure skipping 
to resume upload flowfile={} bucket={} " +
    +                                                "key={} position={} 
reason={}",
    +                                                new Object[]{ffFilename, 
bucket, key, currentState.getFilePosition(),
    +                                                        e.getMessage()});
    +                                        throw(new ProcessException(e));
    +                                    }
    +                                }
    +                            }
    +
    +                            // upload parts
    +                            
//------------------------------------------------------------
    +                            long thisPartSize;
    +                            for (int part = 
currentState.getPartETags().size() + 1;
    +                                 currentState.getFilePosition() < 
currentState.getContentLength(); part++) {
    +                                if (!PutS3Object.this.isScheduled()) {
    +                                    getLogger().info("Processor 
unscheduled, stopping upload flowfile={} part={} " +
    +                                            "uploadId={}", new 
Object[]{ffFilename, part, currentState.getUploadId()});
    +                                    session.rollback();
    +                                    return;
    +                                }
    +                                thisPartSize = 
Math.min(currentState.getPartSize(),
    +                                        (currentState.getContentLength() - 
currentState.getFilePosition()));
    +                                UploadPartRequest uploadRequest = new 
UploadPartRequest()
    +                                        .withBucketName(bucket)
    +                                        .withKey(key)
    +                                        
.withUploadId(currentState.getUploadId())
    +                                        .withInputStream(in)
    +                                        .withPartNumber(part)
    +                                        .withPartSize(thisPartSize);
    +                                try {
    +                                    UploadPartResult uploadPartResult = 
s3.uploadPart(uploadRequest);
    +                                    
currentState.addPartETag(uploadPartResult.getPartETag());
    +                                    
currentState.setFilePosition(currentState.getFilePosition() + thisPartSize);
    +                                    try {
    +                                        persistState(cacheKey, 
currentState);
    +                                    } catch (Exception e) {
    --- End diff --
    
    I'd add a comment here on why swallowing exceptions here is okay, as it is 
not immediately intuitive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to