Github user trkurc commented on a diff in the pull request: https://github.com/apache/nifi/pull/132#discussion_r45944159 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java --- @@ -143,23 +316,223 @@ public void process(final InputStream rawIn) throws IOException { objectMetadata.setUserMetadata(userMetadata); } - final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata); - request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue())); - final AccessControlList acl = createACL(context, ff); - if (acl != null) { - request.setAccessControlList(acl); - } + if (ff.getSize() <= multipartThreshold) { + //---------------------------------------- + // single part upload + //---------------------------------------- + final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata); + request.setStorageClass( + StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue())); + final AccessControlList acl = createACL(context, ff); + if (acl != null) { + request.setAccessControlList(acl); + } - final PutObjectResult result = s3.putObject(request); - if (result.getVersionId() != null) { - attributes.put("s3.version", result.getVersionId()); - } + try { + final PutObjectResult result = s3.putObject(request); + if (result.getVersionId() != null) { + attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId()); + } + if (result.getETag() != null) { + attributes.put(S3_ETAG_ATTR_KEY, result.getETag()); + } + if (result.getExpirationTime() != null) { + attributes.put(S3_EXPIRATION_ATTR_KEY, result.getExpirationTime().toString()); + } + if (result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY)) { + attributes.put(S3_STORAGECLASS_ATTR_KEY, + result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString()); + } + if (userMetadata.size() > 0) { + StringBuilder userMetaBldr = new StringBuilder(); + for (String userKey : userMetadata.keySet()) { + userMetaBldr.append(userKey).append("=").append(userMetadata.get(userKey)); + } + attributes.put(S3_USERMETA_ATTR_KEY, userMetaBldr.toString()); + } + } catch (AmazonClientException e) { + getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}", + new Object[]{ffFilename, bucket, key, e.getMessage()}); + throw (e); + } + } else { + //---------------------------------------- + // multippart upload + //---------------------------------------- - attributes.put("s3.etag", result.getETag()); + // load or create persistent state + //------------------------------------------------------------ + MultipartState currentState; + try { + currentState = getState(cacheKey); + if (currentState != null) { + if (currentState.getPartETags().size() > 0) { + final PartETag lastETag = currentState.getPartETags().get( + currentState.getPartETags().size() - 1); + getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " + + "uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " + + "contentLength='{}' partsLoaded={} lastPart={}/{}", + new Object[]{ffFilename, bucket, key, currentState.getUploadId(), + currentState.getFilePosition(), currentState.getPartSize(), + currentState.getStorageClass().toString(), + currentState.getContentLength(), + currentState.getPartETags().size(), + Integer.toString(lastETag.getPartNumber()), + lastETag.getETag()}); + } else { + getLogger().info("Resuming upload for flowfile='{}' bucket='{}' key='{}' " + + "uploadID='{}' filePosition='{}' partSize='{}' storageClass='{}' " + + "contentLength='{}' no partsLoaded", + new Object[]{ffFilename, bucket, key, currentState.getUploadId(), + currentState.getFilePosition(), currentState.getPartSize(), + currentState.getStorageClass().toString(), + currentState.getContentLength()}); + } + } else { + currentState = new MultipartState(); + currentState.setPartSize(multipartPartSize); + currentState.setStorageClass( + StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue())); + currentState.setContentLength(ff.getSize()); + persistState(cacheKey, currentState); + getLogger().info("Starting new upload for flowfile='{}' bucket='{}' key='{}'", + new Object[]{ffFilename, bucket, key}); + } + } catch (IOException e) { + getLogger().error("IOException initiating cache state while processing flow files: " + + e.getMessage()); + throw (e); + } + + // initiate multipart upload or find position in file + //------------------------------------------------------------ + if (currentState.getUploadId().isEmpty()) { + final InitiateMultipartUploadRequest initiateRequest = + new InitiateMultipartUploadRequest(bucket, key, objectMetadata); + initiateRequest.setStorageClass(currentState.getStorageClass()); + final AccessControlList acl = createACL(context, ff); + if (acl != null) { + initiateRequest.setAccessControlList(acl); + } + try { + final InitiateMultipartUploadResult initiateResult = + s3.initiateMultipartUpload(initiateRequest); + currentState.setUploadId(initiateResult.getUploadId()); + currentState.getPartETags().clear(); + try { + persistState(cacheKey, currentState); + } catch (Exception e) { + getLogger().info("Exception saving cache state while processing flow file: " + + e.getMessage()); + throw(new ProcessException("Exception saving cache state", e)); + } + getLogger().info("Success initiating upload flowfile={} available={} position={} " + + "length={} bucket={} key={} uploadId={}", + new Object[]{ffFilename, in.available(), currentState.getFilePosition(), + currentState.getContentLength(), bucket, key, + currentState.getUploadId()}); + if (initiateResult.getUploadId() != null) { + attributes.put(S3_UPLOAD_ID_ATTR_KEY, initiateResult.getUploadId()); + } + } catch (AmazonClientException e) { + getLogger().info("Failure initiating upload flowfile={} bucket={} key={} reason={}", + new Object[]{ffFilename, bucket, key, e.getMessage()}); + throw(e); + } + } else { + if (currentState.getFilePosition() > 0) { + try { + final long skipped = in.skip(currentState.getFilePosition()); + if (skipped != currentState.getFilePosition()) { + getLogger().info("Failure skipping to resume upload flowfile={} " + + "bucket={} key={} position={} skipped={}", + new Object[]{ffFilename, bucket, key, + currentState.getFilePosition(), skipped}); + } + } catch (Exception e) { + getLogger().info("Failure skipping to resume upload flowfile={} bucket={} " + + "key={} position={} reason={}", + new Object[]{ffFilename, bucket, key, currentState.getFilePosition(), + e.getMessage()}); + throw(new ProcessException(e)); + } + } + } + + // upload parts + //------------------------------------------------------------ + long thisPartSize; + for (int part = currentState.getPartETags().size() + 1; + currentState.getFilePosition() < currentState.getContentLength(); part++) { + if (!PutS3Object.this.isScheduled()) { + getLogger().info("Processor unscheduled, stopping upload flowfile={} part={} " + + "uploadId={}", new Object[]{ffFilename, part, currentState.getUploadId()}); + session.rollback(); + return; + } + thisPartSize = Math.min(currentState.getPartSize(), + (currentState.getContentLength() - currentState.getFilePosition())); + UploadPartRequest uploadRequest = new UploadPartRequest() + .withBucketName(bucket) + .withKey(key) + .withUploadId(currentState.getUploadId()) + .withInputStream(in) + .withPartNumber(part) + .withPartSize(thisPartSize); + try { + UploadPartResult uploadPartResult = s3.uploadPart(uploadRequest); + currentState.addPartETag(uploadPartResult.getPartETag()); + currentState.setFilePosition(currentState.getFilePosition() + thisPartSize); + try { + persistState(cacheKey, currentState); + } catch (Exception e) { --- End diff -- I'd add a comment here on why swallowing exceptions here is okay, as it is not immediately intuitive.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---