Repository: nifi
Updated Branches:
  refs/heads/master f6d342795 -> 49ee06b0a


[NIFI-774] Create a DeleteS3Object Processor


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d1dbd376
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d1dbd376
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d1dbd376

Branch: refs/heads/master
Commit: d1dbd37629baeb9a1570f47f7583d07f54b38b5c
Parents: e4e263c
Author: Yu ISHIKAWA <yuu.ishik...@gmail.com>
Authored: Thu Jul 23 13:13:15 2015 +0900
Committer: Yuu ISHIKAWA <yuu.ishik...@gmail.com>
Committed: Tue Sep 1 22:35:11 2015 +0900

----------------------------------------------------------------------
 .../nifi/processors/aws/s3/DeleteS3Object.java  | 108 +++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../processors/aws/s3/TestDeleteS3Object.java   | 137 +++++++++++++++++++
 3 files changed, 246 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d1dbd376/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
new file mode 100644
index 0000000..3be7a15
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.DeleteObjectRequest;
+import com.amazonaws.services.s3.model.DeleteVersionRequest;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+
+
+@SupportsBatching
+@SeeAlso({PutS3Object.class})
+@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
+@CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " +
+        "And the FlowFiles are checked if exists or not before deleting.")
+public class DeleteS3Object extends AbstractS3Processor {
+
+    public static final PropertyDescriptor VERSION_ID = new 
PropertyDescriptor.Builder()
+            .name("Version")
+            .description("The Version of the Object to delete")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(false)
+            .build();
+
+    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
+            Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, 
CREDENTAILS_FILE, REGION, TIMEOUT, VERSION_ID,
+                    FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, 
READ_ACL_LIST, WRITE_ACL_LIST, OWNER));
+
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .expressionLanguageSupported(true)
+                .dynamic(true)
+                .build();
+    }
+
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+
+        final String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String versionId = 
context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
+
+        final AmazonS3 s3 = getClient();
+        try {
+            // Checks if the key exists or not
+            // If there is no such a key, then throws a exception
+            s3.getObjectMetadata(bucket, key);
+
+            // Deletes a key on Amazon S3
+            if (versionId == null) {
+                final DeleteObjectRequest r = new DeleteObjectRequest(bucket, 
key);
+                s3.deleteObject(r);
+            } else {
+                final DeleteVersionRequest r = new 
DeleteVersionRequest(bucket, key, versionId);
+                s3.deleteVersion(r);
+            }
+        } catch (final AmazonServiceException ase) {
+            getLogger().error("Failed to delete S3 Object for {}; routing to 
failure", new Object[]{flowFile, ase});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        session.transfer(flowFile, REL_SUCCESS);
+        final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        getLogger().info("Successfully delete S3 Object for {} in {} millis; 
routing to success", new Object[]{flowFile, transferMillis});
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1dbd376/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 4f2405c..d0d1e73 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -14,6 +14,7 @@
 # limitations under the License.
 org.apache.nifi.processors.aws.s3.FetchS3Object
 org.apache.nifi.processors.aws.s3.PutS3Object
+org.apache.nifi.processors.aws.s3.DeleteS3Object
 org.apache.nifi.processors.aws.sns.PutSNS
 org.apache.nifi.processors.aws.sqs.GetSQS
 org.apache.nifi.processors.aws.sqs.PutSQS

http://git-wip-us.apache.org/repos/asf/nifi/blob/d1dbd376/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
new file mode 100644
index 0000000..dfe6edb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestDeleteS3Object.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.amazonaws.auth.PropertiesCredentials;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.*;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+
+
+@Ignore("For local testing only - interacts with S3 so the credentials file 
must be configured and all necessary buckets created")
+public class TestDeleteS3Object {
+
+    private static final String CREDENTIALS_FILE = 
System.getProperty("user.home") + "/aws-credentials.properties";
+
+    // When you want to test this, you should create a bucket on Amazon S3 as 
follows.
+    private static final String TEST_REGION = "ap-northeast-1";
+    private static final String TEST_BUCKET = 
"test-bucket-00000000-0000-0000-0000-1234567890123";
+
+    @BeforeClass
+    public static void oneTimeSetUp() {
+        // Creates a new bucket for this test
+        try {
+            PropertiesCredentials credentials = new PropertiesCredentials(new 
FileInputStream(CREDENTIALS_FILE));
+            AmazonS3Client client = new AmazonS3Client(credentials);
+            CreateBucketRequest request = new CreateBucketRequest(TEST_BUCKET, 
TEST_REGION);
+            client.createBucket(request);
+        } catch (final AmazonS3Exception e) {
+            System.out.println(TEST_BUCKET + " already exists.");
+        } catch (final IOException e) {
+            System.out.println(CREDENTIALS_FILE + " doesn't exist.");
+        }
+    }
+
+    @AfterClass
+    public static void oneTimeTearDown() throws IOException {
+        // Delete a bucket for this test
+        PropertiesCredentials credentials = new PropertiesCredentials(new 
FileInputStream(CREDENTIALS_FILE));
+        AmazonS3Client client = new AmazonS3Client(credentials);
+        DeleteBucketRequest dbr = new DeleteBucketRequest(TEST_BUCKET);
+        client.deleteBucket(dbr);
+    }
+
+    @Test
+    public void testSimpleDelete() throws IOException {
+        // Prepares for this test
+        uploadTestFile("hello.txt");
+
+        DeleteS3Object deleter = new DeleteS3Object();
+        final TestRunner runner = TestRunners.newTestRunner(deleter);
+        runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(DeleteS3Object.REGION, TEST_REGION);
+        runner.setProperty(DeleteS3Object.BUCKET, TEST_BUCKET);
+        runner.setProperty(DeleteS3Object.KEY, "hello.txt");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testDeleteFolder() throws IOException {
+        // Prepares for this test
+        uploadTestFile("folder/1.txt");
+
+        DeleteS3Object deleter = new DeleteS3Object();
+        final TestRunner runner = TestRunners.newTestRunner(deleter);
+        runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(DeleteS3Object.REGION, TEST_REGION);
+        runner.setProperty(DeleteS3Object.BUCKET, TEST_BUCKET);
+        runner.setProperty(DeleteS3Object.KEY, "folder/1.txt");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "hello.txt");
+        runner.enqueue(new byte[0], attrs);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testTryToDeleteNotExistingFile() throws IOException {
+        DeleteS3Object deleter = new DeleteS3Object();
+        final TestRunner runner = TestRunners.newTestRunner(deleter);
+        runner.setProperty(DeleteS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(DeleteS3Object.REGION, TEST_REGION);
+        runner.setProperty(DeleteS3Object.BUCKET, TEST_BUCKET);
+        runner.setProperty(DeleteS3Object.BUCKET, "no-such-a-key");
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "no-such-a-file");
+        runner.enqueue(new byte[0], attrs);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_FAILURE, 1);
+    }
+
+    // Uploads a test file
+    private void uploadTestFile(String key) throws IOException {
+        PropertiesCredentials credentials = new PropertiesCredentials(new 
FileInputStream(CREDENTIALS_FILE));
+        AmazonS3Client client = new AmazonS3Client(credentials);
+        URL fileURL = 
this.getClass().getClassLoader().getResource("hello.txt");
+        File file = new File(fileURL.getPath());
+        PutObjectRequest putRequest = new PutObjectRequest(TEST_BUCKET, key, 
file);
+        PutObjectResult result = client.putObject(putRequest);
+    }
+}

Reply via email to