Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/121#discussion_r44713942
  
    --- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3ObjectMultipart.java
 ---
    @@ -0,0 +1,550 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.s3;
    +
    +import com.amazonaws.AmazonClientException;
    +import com.amazonaws.services.s3.AmazonS3;
    +import com.amazonaws.services.s3.AmazonS3Client;
    +import com.amazonaws.services.s3.model.AccessControlList;
    +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
    +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
    +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
    +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
    +import com.amazonaws.services.s3.model.ObjectMetadata;
    +import com.amazonaws.services.s3.model.PartETag;
    +import com.amazonaws.services.s3.model.StorageClass;
    +import com.amazonaws.services.s3.model.UploadPartRequest;
    +import com.amazonaws.services.s3.model.UploadPartResult;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.BufferedInputStream;
    +
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.FileOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.Serializable;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Properties;
    +import java.util.concurrent.TimeUnit;
    +
    +@SeeAlso({FetchS3Object.class, PutS3Object.class, DeleteS3Object.class})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"Amazon", "S3", "AWS", "Archive", "Put", "Multi", "Multipart", 
"Upload"})
    +@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket using the 
MultipartUpload API method.  " +
    +        "This upload consists of three steps 1) initiate upload, 2) upload 
the parts, and 3) complete the upload.\n" +
    +        "Since the intent for this processor involves large files, the 
processor saves state locally after each step " +
    +        "so that an upload can be resumed without having to restart from 
the beginning of the file.\n" +
    +        "The AWS libraries default to using standard AWS regions but the 
'Endpoint Override URL' allows this to be " +
    +        "overridden.")
    +@DynamicProperty(name = "The name of a User-Defined Metadata field to add 
to the S3 Object",
    +        value = "The value of a User-Defined Metadata field to add to the 
S3 Object",
    +        description = "Allows user-defined metadata to be added to the S3 
object as key/value pairs",
    +        supportsExpressionLanguage = true)
    +@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's 
filename as the filename for the S3 object")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = "s3.bucket", description = "The S3 
bucket where the Object was put in S3"),
    +        @WritesAttribute(attribute = "s3.key", description = "The S3 key 
within where the Object was put in S3"),
    +        @WritesAttribute(attribute = "s3.version", description = "The 
version of the S3 Object that was put to S3"),
    +        @WritesAttribute(attribute = "s3.etag", description = "The ETag of 
the S3 Object"),
    +        @WritesAttribute(attribute = "s3.uploadId", description = "The 
uploadId used to upload the Object to S3"),
    +        @WritesAttribute(attribute = "s3.expiration", description = "A 
human-readable form of the expiration date of " +
    +                "the S3 object, if one is set"),
    +        @WritesAttribute(attribute = "s3.usermetadata", description = "A 
human-readable form of the User Metadata " +
    +                "of the S3 object, if any was set")
    +})
    +public class PutS3ObjectMultipart extends AbstractS3Processor {
    +
    +    public static final long MIN_BYTES_INCLUSIVE = 50L * 1024L * 1024L;
    +    public static final long MAX_BYTES_INCLUSIVE = 5L * 1024L * 1024L * 
1024L;
    +    public static final String PERSISTENCE_ROOT = "conf/state/";
    +
    +    public static final PropertyDescriptor EXPIRATION_RULE_ID = new 
PropertyDescriptor.Builder()
    +            .name("Expiration Time Rule")
    +            .required(false)
    +            .expressionLanguageSupported(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor STORAGE_CLASS = new 
PropertyDescriptor.Builder()
    +            .name("Storage Class")
    +            .required(true)
    +            .allowableValues(StorageClass.Standard.name(), 
StorageClass.ReducedRedundancy.name())
    +            .defaultValue(StorageClass.Standard.name())
    +            .build();
    +
    +    public static final PropertyDescriptor PART_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Part Size")
    +            .description("Specifies the Part Size to be used for the S3 
Multipart Upload API.  The flow file will be " +
    +                    "broken into Part Size chunks during upload.  Part 
size must be at least 50MB and no more than " +
    +                    "5GB, but the final part can be less than 50MB.")
    +            .required(true)
    +            .defaultValue("5 GB")
    +            
.addValidator(StandardValidators.createDataSizeBoundsValidator(MIN_BYTES_INCLUSIVE,
 MAX_BYTES_INCLUSIVE))
    +            .build();
    +
    +    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
    +            Arrays.asList(KEY, BUCKET, PART_SIZE, ENDPOINT_OVERRIDE, 
ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE,
    +                    SSL_CONTEXT_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, 
EXPIRATION_RULE_ID,
    +                    FULL_CONTROL_USER_LIST, READ_USER_LIST, 
WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
    +
    +    final static String S3_BUCKET_KEY = "s3.bucket";
    +    final static String S3_OBJECT_KEY = "s3.key";
    +    final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId";
    +    final static String S3_VERSION_ATTR_KEY = "s3.version";
    +    final static String S3_ETAG_ATTR_KEY = "s3.etag";
    +    final static String S3_EXPIRATION_ATTR_KEY = "s3.expiration";
    +    final static String S3_USERMETA_ATTR_KEY = "s3.usermetadata";
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .name(propertyDescriptorName)
    +                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +                .expressionLanguageSupported(true)
    +                .dynamic(true)
    +                .build();
    +    }
    +
    +    protected File getPersistenceFile() {
    --- End diff --
    
    I guess, the same comment about _private_ would apply here and in other 
similar methods


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to