[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-23 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r53893312
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import java.util.List;
+
+import org.apache.nifi.processors.aws.s3.FetchS3Object;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * This test contains both unit and integration test (integration tests 
are ignored by default)
+ */
+public class ITPutKinesisFirehose {
+
+private TestRunner runner;
+protected final static String CREDENTIALS_FILE = 
System.getProperty("user.home") + "/aws-credentials.properties";
+
+@Before
+public void setUp() throws Exception {
+runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+runner.setProperty(PutKinesisFirehose.ACCESS_KEY, "abcd");
+runner.setProperty(PutKinesisFirehose.SECRET_KEY, "secret key");
+
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"deliveryName");
+runner.assertValid();
+}
+
+@After
+public void tearDown() throws Exception {
+runner = null;
+}
+
+@Test
+public void testCustomValidateBatchSize1Valid() {
+runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "1");
+runner.assertValid();
+}
+
+@Test
+public void testCustomValidateBatchSize500Valid() {
+runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "500");
+runner.assertValid();
+}
+@Test
+public void testCustomValidateBatchSize501InValid() {
+runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "501");
+runner.assertNotValid();
+}
+
+@Test
+public void testCustomValidateBufferSize1Valid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "1");
+runner.assertValid();
+}
+
+@Test
+public void testCustomValidateBufferSize128Valid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "128");
+runner.assertValid();
+}
+@Test
+public void testCustomValidateBufferSize129InValid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "129");
+runner.assertNotValid();
+}
+
+@Test
+public void testCustomValidateBufferInterval900Valid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "900");
+runner.assertValid();
+}
+
+@Test
+public void testCustomValidateBufferInterval60Valid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "60");
+runner.assertValid();
+}
+
+@Test
+public void testCustomValidateBufferInterval901InValid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "901");
+runner.assertNotValid();
+}
+
+@Test
+public void testCustomValidateBufferInterval59InValid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "59");
+runner.assertNotValid();
+}
+
+/**
+ * Comment out ignore for integration tests (requires creds files)
+ */
+@Test
+@Ignore
--- End diff --

Removed ignore annotation for IT tests


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not 

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-23 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r53893277
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
+import 
com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
+import com.amazonaws.services.kinesisfirehose.model.Record;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
+@CapabilityDescription("Sends the contents to a specified Amazon Kinesis 
Firehose. "
++ "In order to send data to firehose, the firehose delivery stream 
name has to be specified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", 
description = "Error message on posting message to AWS Kinesis Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", 
description = "Error code for the message when posting to AWS Kinesis 
Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", 
description = "Record id of the message posted to Kinesis Firehose")})
+public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
+
+/**
+ * Kinesis put record response error message
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = 
"aws.kinesis.firehose.error.message";
+
+/**
+ * Kinesis put record response error code
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = 
"aws.kinesis.firehose.error.code";
+
+/**
+ * Kinesis put record response record id
+ */
+public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = 
"aws.kinesis.firehose.record.id";
+
+public static final List properties = 
Collections.unmodifiableList(
+Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
MAX_BUFFER_INTERVAL,
+  MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, 
SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
+  PROXY_HOST,PROXY_HOST_PORT));
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return properties;
+}
+
+@Override
+protected Collection customValidate(final 
ValidationContext validationContext) {
+final List problems = new 

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-23 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r53893236
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
+import 
com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
+import com.amazonaws.services.kinesisfirehose.model.Record;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
+@CapabilityDescription("Sends the contents to a specified Amazon Kinesis 
Firehose. "
++ "In order to send data to firehose, the firehose delivery stream 
name has to be specified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", 
description = "Error message on posting message to AWS Kinesis Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", 
description = "Error code for the message when posting to AWS Kinesis 
Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", 
description = "Record id of the message posted to Kinesis Firehose")})
+public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
+
+/**
+ * Kinesis put record response error message
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = 
"aws.kinesis.firehose.error.message";
+
+/**
+ * Kinesis put record response error code
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = 
"aws.kinesis.firehose.error.code";
+
+/**
+ * Kinesis put record response record id
+ */
+public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = 
"aws.kinesis.firehose.record.id";
+
+public static final List properties = 
Collections.unmodifiableList(
+Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
MAX_BUFFER_INTERVAL,
+  MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, 
SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
+  PROXY_HOST,PROXY_HOST_PORT));
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return properties;
+}
+
+@Override
+protected Collection customValidate(final 
ValidationContext validationContext) {
--- End diff --

I've removed the custom validator code


---
If your project is 

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-23 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r53893209
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+
+/**
+ * This class provides processor the base class for kinesis firehose
+ */
+public abstract class AbstractKinesisFirehoseProcessor extends 
AbstractAWSCredentialsProviderProcessor {
+
+public static final PropertyDescriptor 
KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
+.name("Amazon Kinesis Firehose Delivery Stream Name")
+.description("The name of kinesis firehose delivery stream")
+.expressionLanguageSupported(false)
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor MAX_BUFFER_INTERVAL = new 
PropertyDescriptor.Builder()
+.name("Max Buffer Interval")
+.description("Buffering interval for messages (between 60 and 
900 seconds).")
+.defaultValue("60")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.sensitive(false)
+.build();
+
+public static final PropertyDescriptor MAX_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+.name("Max Buffer Size (MB)")
+.description("Buffering size for messages (between 1MB and 
128MB).")
+.defaultValue("128")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.sensitive(false)
+.build();
+
+public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
--- End diff --

@apiri - I've corrected this based on your feedback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-23 Thread mans2singh
Github user mans2singh commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r53893190
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+
+/**
+ * This class provides processor the base class for kinesis firehose
+ */
+public abstract class AbstractKinesisFirehoseProcessor extends 
AbstractAWSCredentialsProviderProcessor {
+
+public static final PropertyDescriptor 
KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
+.name("Amazon Kinesis Firehose Delivery Stream Name")
+.description("The name of kinesis firehose delivery stream")
+.expressionLanguageSupported(false)
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor MAX_BUFFER_INTERVAL = new 
PropertyDescriptor.Builder()
+.name("Max Buffer Interval")
+.description("Buffering interval for messages (between 60 and 
900 seconds).")
+.defaultValue("60")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
--- End diff --

@apiri - The max buffer size and interval are configuration properties and 
as you pointed out correctly not used in client.  I've removed them and also 
have updated validators to use createLongValidator


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-23 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-187907196
  
@mans2singh Did an initial look over the code.  The approach seems good but 
would like to understand more about the batching process and how we can perform 
this in a safe manner that does not exhaust heap so readily.  

Also, would you please be able to edit the PR name to something with 
NIFI-1495 in it so that hopefully the JIRA integration will link and include 
those items on the issue.  It is unneeded to keep the reference to NIFI-1489 as 
those changes have been merged and PR closed in #209 

Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-23 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r53850051
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import java.util.List;
+
+import org.apache.nifi.processors.aws.s3.FetchS3Object;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * This test contains both unit and integration test (integration tests 
are ignored by default)
+ */
+public class ITPutKinesisFirehose {
+
+private TestRunner runner;
+protected final static String CREDENTIALS_FILE = 
System.getProperty("user.home") + "/aws-credentials.properties";
+
+@Before
+public void setUp() throws Exception {
+runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+runner.setProperty(PutKinesisFirehose.ACCESS_KEY, "abcd");
+runner.setProperty(PutKinesisFirehose.SECRET_KEY, "secret key");
+
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"deliveryName");
+runner.assertValid();
+}
+
+@After
+public void tearDown() throws Exception {
+runner = null;
+}
+
+@Test
+public void testCustomValidateBatchSize1Valid() {
+runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "1");
+runner.assertValid();
+}
+
+@Test
+public void testCustomValidateBatchSize500Valid() {
+runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "500");
+runner.assertValid();
+}
+@Test
+public void testCustomValidateBatchSize501InValid() {
+runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "501");
+runner.assertNotValid();
+}
+
+@Test
+public void testCustomValidateBufferSize1Valid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "1");
+runner.assertValid();
+}
+
+@Test
+public void testCustomValidateBufferSize128Valid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "128");
+runner.assertValid();
+}
+@Test
+public void testCustomValidateBufferSize129InValid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "129");
+runner.assertNotValid();
+}
+
+@Test
+public void testCustomValidateBufferInterval900Valid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "900");
+runner.assertValid();
+}
+
+@Test
+public void testCustomValidateBufferInterval60Valid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "60");
+runner.assertValid();
+}
+
+@Test
+public void testCustomValidateBufferInterval901InValid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "901");
+runner.assertNotValid();
+}
+
+@Test
+public void testCustomValidateBufferInterval59InValid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "59");
+runner.assertNotValid();
+}
+
+/**
+ * Comment out ignore for integration tests (requires creds files)
+ */
+@Test
+@Ignore
+public void testIntegrationSuccess() throws Exception {
+runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-23 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r53849876
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import java.util.List;
+
+import org.apache.nifi.processors.aws.s3.FetchS3Object;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * This test contains both unit and integration test (integration tests 
are ignored by default)
+ */
+public class ITPutKinesisFirehose {
--- End diff --

customValidate tests and similar, if still needed or desired, should go 
into a standard TestPutKinesisFirehose class so that they are run on each build


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-23 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r53849994
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import java.util.List;
+
+import org.apache.nifi.processors.aws.s3.FetchS3Object;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * This test contains both unit and integration test (integration tests 
are ignored by default)
+ */
+public class ITPutKinesisFirehose {
+
+private TestRunner runner;
+protected final static String CREDENTIALS_FILE = 
System.getProperty("user.home") + "/aws-credentials.properties";
+
+@Before
+public void setUp() throws Exception {
+runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+runner.setProperty(PutKinesisFirehose.ACCESS_KEY, "abcd");
+runner.setProperty(PutKinesisFirehose.SECRET_KEY, "secret key");
+
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"deliveryName");
+runner.assertValid();
+}
+
+@After
+public void tearDown() throws Exception {
+runner = null;
+}
+
+@Test
+public void testCustomValidateBatchSize1Valid() {
+runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "1");
+runner.assertValid();
+}
+
+@Test
+public void testCustomValidateBatchSize500Valid() {
+runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "500");
+runner.assertValid();
+}
+@Test
+public void testCustomValidateBatchSize501InValid() {
+runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "501");
+runner.assertNotValid();
+}
+
+@Test
+public void testCustomValidateBufferSize1Valid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "1");
+runner.assertValid();
+}
+
+@Test
+public void testCustomValidateBufferSize128Valid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "128");
+runner.assertValid();
+}
+@Test
+public void testCustomValidateBufferSize129InValid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "129");
+runner.assertNotValid();
+}
+
+@Test
+public void testCustomValidateBufferInterval900Valid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "900");
+runner.assertValid();
+}
+
+@Test
+public void testCustomValidateBufferInterval60Valid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "60");
+runner.assertValid();
+}
+
+@Test
+public void testCustomValidateBufferInterval901InValid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "901");
+runner.assertNotValid();
+}
+
+@Test
+public void testCustomValidateBufferInterval59InValid() {
+runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "59");
+runner.assertNotValid();
+}
+
+/**
+ * Comment out ignore for integration tests (requires creds files)
+ */
+@Test
+@Ignore
--- End diff --

The profile to run ITs is not enabled by default and this Ignore is 
unneeded.  Would prefer to have removed so that no code changes are needed to 
run them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub 

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-23 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r53849365
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
+import 
com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
+import com.amazonaws.services.kinesisfirehose.model.Record;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
+@CapabilityDescription("Sends the contents to a specified Amazon Kinesis 
Firehose. "
++ "In order to send data to firehose, the firehose delivery stream 
name has to be specified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", 
description = "Error message on posting message to AWS Kinesis Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", 
description = "Error code for the message when posting to AWS Kinesis 
Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", 
description = "Record id of the message posted to Kinesis Firehose")})
+public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
+
+/**
+ * Kinesis put record response error message
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = 
"aws.kinesis.firehose.error.message";
+
+/**
+ * Kinesis put record response error code
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = 
"aws.kinesis.firehose.error.code";
+
+/**
+ * Kinesis put record response record id
+ */
+public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = 
"aws.kinesis.firehose.record.id";
+
+public static final List properties = 
Collections.unmodifiableList(
+Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
MAX_BUFFER_INTERVAL,
+  MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, 
SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
+  PROXY_HOST,PROXY_HOST_PORT));
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return properties;
+}
+
+@Override
+protected Collection customValidate(final 
ValidationContext validationContext) {
+final List problems = new 
ArrayList<>(super.customValidate(validationContext));

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-23 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r53846413
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java
 ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import java.util.List;
+
+import org.apache.nifi.processors.aws.s3.FetchS3Object;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * This test contains both unit and integration test (integration tests 
are ignored by default)
+ */
+public class ITPutKinesisFirehose {
+
+private TestRunner runner;
+protected final static String CREDENTIALS_FILE = 
System.getProperty("user.home") + "/aws-credentials.properties";
+
+@Before
+public void setUp() throws Exception {
+runner = TestRunners.newTestRunner(PutKinesisFirehose.class);
+runner.setProperty(PutKinesisFirehose.ACCESS_KEY, "abcd");
+runner.setProperty(PutKinesisFirehose.SECRET_KEY, "secret key");
+
runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
"deliveryName");
+runner.assertValid();
+}
+
+@After
+public void tearDown() throws Exception {
+runner = null;
+}
+
+@Test
+public void testCustomValidateBatchSize1Valid() {
--- End diff --

These customValidate tests with the above improvements likely add little 
and could be removed with adjusted validators.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-23 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r53845533
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
+import 
com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
+import com.amazonaws.services.kinesisfirehose.model.Record;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
+@CapabilityDescription("Sends the contents to a specified Amazon Kinesis 
Firehose. "
++ "In order to send data to firehose, the firehose delivery stream 
name has to be specified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", 
description = "Error message on posting message to AWS Kinesis Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", 
description = "Error code for the message when posting to AWS Kinesis 
Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", 
description = "Record id of the message posted to Kinesis Firehose")})
+public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
+
+/**
+ * Kinesis put record response error message
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = 
"aws.kinesis.firehose.error.message";
+
+/**
+ * Kinesis put record response error code
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = 
"aws.kinesis.firehose.error.code";
+
+/**
+ * Kinesis put record response record id
+ */
+public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = 
"aws.kinesis.firehose.record.id";
+
+public static final List properties = 
Collections.unmodifiableList(
+Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
MAX_BUFFER_INTERVAL,
+  MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, 
SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
+  PROXY_HOST,PROXY_HOST_PORT));
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return properties;
+}
+
+@Override
+protected Collection customValidate(final 
ValidationContext validationContext) {
+final List problems = new 
ArrayList<>(super.customValidate(validationContext));

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-23 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r53845261
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
+import 
com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
+import com.amazonaws.services.kinesisfirehose.model.Record;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
+@CapabilityDescription("Sends the contents to a specified Amazon Kinesis 
Firehose. "
++ "In order to send data to firehose, the firehose delivery stream 
name has to be specified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", 
description = "Error message on posting message to AWS Kinesis Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", 
description = "Error code for the message when posting to AWS Kinesis 
Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", 
description = "Record id of the message posted to Kinesis Firehose")})
+public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
+
+/**
+ * Kinesis put record response error message
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = 
"aws.kinesis.firehose.error.message";
+
+/**
+ * Kinesis put record response error code
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = 
"aws.kinesis.firehose.error.code";
+
+/**
+ * Kinesis put record response record id
+ */
+public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = 
"aws.kinesis.firehose.record.id";
+
+public static final List properties = 
Collections.unmodifiableList(
+Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
MAX_BUFFER_INTERVAL,
+  MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, 
SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
--- End diff --

If BUFFER_INTERVAL and BUFFER_SIZE are to go unused, they will also need to 
be removed from here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at 

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-23 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r53845184
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
+import 
com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
+import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
+import com.amazonaws.services.kinesisfirehose.model.Record;
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"})
+@CapabilityDescription("Sends the contents to a specified Amazon Kinesis 
Firehose. "
++ "In order to send data to firehose, the firehose delivery stream 
name has to be specified.")
+@WritesAttributes({
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", 
description = "Error message on posting message to AWS Kinesis Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", 
description = "Error code for the message when posting to AWS Kinesis 
Firehose"),
+@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", 
description = "Record id of the message posted to Kinesis Firehose")})
+public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor {
+
+/**
+ * Kinesis put record response error message
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = 
"aws.kinesis.firehose.error.message";
+
+/**
+ * Kinesis put record response error code
+ */
+public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = 
"aws.kinesis.firehose.error.code";
+
+/**
+ * Kinesis put record response record id
+ */
+public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = 
"aws.kinesis.firehose.record.id";
+
+public static final List properties = 
Collections.unmodifiableList(
+Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, 
MAX_BUFFER_INTERVAL,
+  MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, 
SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
+  PROXY_HOST,PROXY_HOST_PORT));
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return properties;
+}
+
+@Override
+protected Collection customValidate(final 
ValidationContext validationContext) {
--- End diff --

With the alternative validators specified above, it should be 

[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-23 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r53845121
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+
+/**
+ * This class provides processor the base class for kinesis firehose
+ */
+public abstract class AbstractKinesisFirehoseProcessor extends 
AbstractAWSCredentialsProviderProcessor {
+
+public static final PropertyDescriptor 
KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
+.name("Amazon Kinesis Firehose Delivery Stream Name")
+.description("The name of kinesis firehose delivery stream")
+.expressionLanguageSupported(false)
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor MAX_BUFFER_INTERVAL = new 
PropertyDescriptor.Builder()
+.name("Max Buffer Interval")
+.description("Buffering interval for messages (between 60 and 
900 seconds).")
+.defaultValue("60")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.sensitive(false)
+.build();
+
+public static final PropertyDescriptor MAX_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
+.name("Max Buffer Size (MB)")
+.description("Buffering size for messages (between 1MB and 
128MB).")
+.defaultValue("128")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.sensitive(false)
+.build();
+
+public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
--- End diff --

I see this is used, but again, consider the aforementioned validator to get 
a range and help remove the unneeded customValidate


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-23 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r53844969
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+
+/**
+ * This class provides processor the base class for kinesis firehose
+ */
+public abstract class AbstractKinesisFirehoseProcessor extends 
AbstractAWSCredentialsProviderProcessor {
+
+public static final PropertyDescriptor 
KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
+.name("Amazon Kinesis Firehose Delivery Stream Name")
+.description("The name of kinesis firehose delivery stream")
+.expressionLanguageSupported(false)
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor MAX_BUFFER_INTERVAL = new 
PropertyDescriptor.Builder()
+.name("Max Buffer Interval")
+.description("Buffering interval for messages (between 60 and 
900 seconds).")
+.defaultValue("60")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.sensitive(false)
+.build();
+
+public static final PropertyDescriptor MAX_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
--- End diff --

Same commentary as above for MAX_BUFFER_INTERVAL.  Not sure how this is 
being used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-23 Thread apiri
Github user apiri commented on a diff in the pull request:

https://github.com/apache/nifi/pull/213#discussion_r53844010
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.kinesis.firehose;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
+
+/**
+ * This class provides processor the base class for kinesis firehose
+ */
+public abstract class AbstractKinesisFirehoseProcessor extends 
AbstractAWSCredentialsProviderProcessor {
+
+public static final PropertyDescriptor 
KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder()
+.name("Amazon Kinesis Firehose Delivery Stream Name")
+.description("The name of kinesis firehose delivery stream")
+.expressionLanguageSupported(false)
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor MAX_BUFFER_INTERVAL = new 
PropertyDescriptor.Builder()
+.name("Max Buffer Interval")
+.description("Buffering interval for messages (between 60 and 
900 seconds).")
+.defaultValue("60")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
--- End diff --

I don't see this property being passed through to the associated 
KinesisFirehoseClient.  Am I overlooking something?

If so, you should prefer the usage of 
StandardValidators#createLongValidator to create a bounded range.  This would 
let you then not have to do your customValidate logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-18 Thread mans2singh
Github user mans2singh commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-185913781
  
Just wanted to check if there is any other feedback/recommendation on 
Kinesis Firehose put processor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-16 Thread mans2singh
Github user mans2singh commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-184885151
  
@apiri I've rebased the forked branch.  Thanks  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-16 Thread apiri
Github user apiri commented on the pull request:

https://github.com/apache/nifi/pull/213#issuecomment-184862199
  
@mans2singh Would you be able to rebase and squash these commits now that 
the proxy support has been included?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...

2016-02-09 Thread mans2singh
GitHub user mans2singh opened a pull request:

https://github.com/apache/nifi/pull/213

Nifi 1489 (Support for http proxy) + Nifi 1495 (AWS Kinesis Firehose)

This pull request combines Http proxy enhancement for aws processors 
(nifi-1489) and aws kinesis firehose processor (nifi-1495)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mans2singh/nifi nifi-1495

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/213.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #213


commit 83c05145ffe9caab932698cb970cde27d905cf81
Author: mans2singh 
Date:   2016-02-08T19:43:03Z

Added support for using proxy

commit 2b8772cdee082528ce121fcc9034c78f5960e233
Author: mans2singh 
Date:   2016-02-09T19:49:05Z

Added support for Kinesis Firehose

commit 7bf42bbf6038fc0147934ab4ed45cc1729e8e13f
Author: mans2singh 
Date:   2016-02-10T03:55:38Z

minor comments

commit fd10d0b76a66ce9fc7868f87c84c1ebc2c25f995
Author: mans2singh 
Date:   2016-02-08T19:43:03Z

Added support for using proxy

commit ca28ad9f05bcb1e3a191096bd8ca10de01628bbd
Author: mans2singh 
Date:   2016-02-09T19:49:05Z

Added support for Kinesis Firehose

commit 875c961c3181aed367361ea8bceba0fe80f5f274
Author: mans2singh 
Date:   2016-02-10T03:55:38Z

minor comments

commit 28e8cfda694a190b58d65a0657a04e92fff5c1ec
Author: mans2singh 
Date:   2016-02-10T04:01:47Z

Merge branch 'awsfirehose' of https://github.com/mans2singh/nifi into 
awsfirehose

commit 1ccfab0916e372c738aeac8cb2ab6556b4d8191f
Author: mans2singh 
Date:   2016-02-10T05:16:26Z

explicit check for proxy host and port

commit 4febccf6d3c2157cf0d68009beff5b9923e81d17
Author: mans2singh 
Date:   2016-02-10T05:22:19Z

added comments to kinesis test




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---