[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r53893312 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.util.List; + +import org.apache.nifi.processors.aws.s3.FetchS3Object; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +/** + * This test contains both unit and integration test (integration tests are ignored by default) + */ +public class ITPutKinesisFirehose { + +private TestRunner runner; +protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + +@Before +public void setUp() throws Exception { +runner = TestRunners.newTestRunner(PutKinesisFirehose.class); +runner.setProperty(PutKinesisFirehose.ACCESS_KEY, "abcd"); +runner.setProperty(PutKinesisFirehose.SECRET_KEY, "secret key"); + runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "deliveryName"); +runner.assertValid(); +} + +@After +public void tearDown() throws Exception { +runner = null; +} + +@Test +public void testCustomValidateBatchSize1Valid() { +runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "1"); +runner.assertValid(); +} + +@Test +public void testCustomValidateBatchSize500Valid() { +runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "500"); +runner.assertValid(); +} +@Test +public void testCustomValidateBatchSize501InValid() { +runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "501"); +runner.assertNotValid(); +} + +@Test +public void testCustomValidateBufferSize1Valid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "1"); +runner.assertValid(); +} + +@Test +public void testCustomValidateBufferSize128Valid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "128"); +runner.assertValid(); +} +@Test +public void testCustomValidateBufferSize129InValid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "129"); +runner.assertNotValid(); +} + +@Test +public void testCustomValidateBufferInterval900Valid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "900"); +runner.assertValid(); +} + +@Test +public void testCustomValidateBufferInterval60Valid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "60"); +runner.assertValid(); +} + +@Test +public void testCustomValidateBufferInterval901InValid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "901"); +runner.assertNotValid(); +} + +@Test +public void testCustomValidateBufferInterval59InValid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "59"); +runner.assertNotValid(); +} + +/** + * Comment out ignore for integration tests (requires creds files) + */ +@Test +@Ignore --- End diff -- Removed ignore annotation for IT tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r53893277 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult; +import com.amazonaws.services.kinesisfirehose.model.Record; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"}) +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. " ++ "In order to send data to firehose, the firehose delivery stream name has to be specified.") +@WritesAttributes({ +@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")}) +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { + +/** + * Kinesis put record response error message + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message"; + +/** + * Kinesis put record response error code + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code"; + +/** + * Kinesis put record response record id + */ +public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id"; + +public static final List properties = Collections.unmodifiableList( +Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, MAX_BUFFER_INTERVAL, + MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, + PROXY_HOST,PROXY_HOST_PORT)); + +@Override +protected List getSupportedPropertyDescriptors() { +return properties; +} + +@Override +protected Collection customValidate(final ValidationContext validationContext) { +final List problems = new
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r53893236 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult; +import com.amazonaws.services.kinesisfirehose.model.Record; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"}) +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. " ++ "In order to send data to firehose, the firehose delivery stream name has to be specified.") +@WritesAttributes({ +@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")}) +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { + +/** + * Kinesis put record response error message + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message"; + +/** + * Kinesis put record response error code + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code"; + +/** + * Kinesis put record response record id + */ +public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id"; + +public static final List properties = Collections.unmodifiableList( +Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, MAX_BUFFER_INTERVAL, + MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, + PROXY_HOST,PROXY_HOST_PORT)); + +@Override +protected List getSupportedPropertyDescriptors() { +return properties; +} + +@Override +protected Collection customValidate(final ValidationContext validationContext) { --- End diff -- I've removed the custom validator code --- If your project is
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r53893209 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; + +/** + * This class provides processor the base class for kinesis firehose + */ +public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor { + +public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder() +.name("Amazon Kinesis Firehose Delivery Stream Name") +.description("The name of kinesis firehose delivery stream") +.expressionLanguageSupported(false) +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor MAX_BUFFER_INTERVAL = new PropertyDescriptor.Builder() +.name("Max Buffer Interval") +.description("Buffering interval for messages (between 60 and 900 seconds).") +.defaultValue("60") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.sensitive(false) +.build(); + +public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() +.name("Max Buffer Size (MB)") +.description("Buffering size for messages (between 1MB and 128MB).") +.defaultValue("128") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.sensitive(false) +.build(); + +public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() --- End diff -- @apiri - I've corrected this based on your feedback. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r53893190 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; + +/** + * This class provides processor the base class for kinesis firehose + */ +public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor { + +public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder() +.name("Amazon Kinesis Firehose Delivery Stream Name") +.description("The name of kinesis firehose delivery stream") +.expressionLanguageSupported(false) +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor MAX_BUFFER_INTERVAL = new PropertyDescriptor.Builder() +.name("Max Buffer Interval") +.description("Buffering interval for messages (between 60 and 900 seconds).") +.defaultValue("60") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) --- End diff -- @apiri - The max buffer size and interval are configuration properties and as you pointed out correctly not used in client. I've removed them and also have updated validators to use createLongValidator --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-187907196 @mans2singh Did an initial look over the code. The approach seems good but would like to understand more about the batching process and how we can perform this in a safe manner that does not exhaust heap so readily. Also, would you please be able to edit the PR name to something with NIFI-1495 in it so that hopefully the JIRA integration will link and include those items on the issue. It is unneeded to keep the reference to NIFI-1489 as those changes have been merged and PR closed in #209 Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r53850051 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.util.List; + +import org.apache.nifi.processors.aws.s3.FetchS3Object; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +/** + * This test contains both unit and integration test (integration tests are ignored by default) + */ +public class ITPutKinesisFirehose { + +private TestRunner runner; +protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + +@Before +public void setUp() throws Exception { +runner = TestRunners.newTestRunner(PutKinesisFirehose.class); +runner.setProperty(PutKinesisFirehose.ACCESS_KEY, "abcd"); +runner.setProperty(PutKinesisFirehose.SECRET_KEY, "secret key"); + runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "deliveryName"); +runner.assertValid(); +} + +@After +public void tearDown() throws Exception { +runner = null; +} + +@Test +public void testCustomValidateBatchSize1Valid() { +runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "1"); +runner.assertValid(); +} + +@Test +public void testCustomValidateBatchSize500Valid() { +runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "500"); +runner.assertValid(); +} +@Test +public void testCustomValidateBatchSize501InValid() { +runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "501"); +runner.assertNotValid(); +} + +@Test +public void testCustomValidateBufferSize1Valid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "1"); +runner.assertValid(); +} + +@Test +public void testCustomValidateBufferSize128Valid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "128"); +runner.assertValid(); +} +@Test +public void testCustomValidateBufferSize129InValid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "129"); +runner.assertNotValid(); +} + +@Test +public void testCustomValidateBufferInterval900Valid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "900"); +runner.assertValid(); +} + +@Test +public void testCustomValidateBufferInterval60Valid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "60"); +runner.assertValid(); +} + +@Test +public void testCustomValidateBufferInterval901InValid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "901"); +runner.assertNotValid(); +} + +@Test +public void testCustomValidateBufferInterval59InValid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "59"); +runner.assertNotValid(); +} + +/** + * Comment out ignore for integration tests (requires creds files) + */ +@Test +@Ignore +public void testIntegrationSuccess() throws Exception { +runner = TestRunners.newTestRunner(PutKinesisFirehose.class); +runner.setProperty(PutKinesisFirehose.CREDENTIALS_FILE, CREDENTIALS_FILE); +
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r53849876 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.util.List; + +import org.apache.nifi.processors.aws.s3.FetchS3Object; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +/** + * This test contains both unit and integration test (integration tests are ignored by default) + */ +public class ITPutKinesisFirehose { --- End diff -- customValidate tests and similar, if still needed or desired, should go into a standard TestPutKinesisFirehose class so that they are run on each build --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r53849994 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.util.List; + +import org.apache.nifi.processors.aws.s3.FetchS3Object; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +/** + * This test contains both unit and integration test (integration tests are ignored by default) + */ +public class ITPutKinesisFirehose { + +private TestRunner runner; +protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + +@Before +public void setUp() throws Exception { +runner = TestRunners.newTestRunner(PutKinesisFirehose.class); +runner.setProperty(PutKinesisFirehose.ACCESS_KEY, "abcd"); +runner.setProperty(PutKinesisFirehose.SECRET_KEY, "secret key"); + runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "deliveryName"); +runner.assertValid(); +} + +@After +public void tearDown() throws Exception { +runner = null; +} + +@Test +public void testCustomValidateBatchSize1Valid() { +runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "1"); +runner.assertValid(); +} + +@Test +public void testCustomValidateBatchSize500Valid() { +runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "500"); +runner.assertValid(); +} +@Test +public void testCustomValidateBatchSize501InValid() { +runner.setProperty(PutKinesisFirehose.BATCH_SIZE, "501"); +runner.assertNotValid(); +} + +@Test +public void testCustomValidateBufferSize1Valid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "1"); +runner.assertValid(); +} + +@Test +public void testCustomValidateBufferSize128Valid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "128"); +runner.assertValid(); +} +@Test +public void testCustomValidateBufferSize129InValid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_SIZE, "129"); +runner.assertNotValid(); +} + +@Test +public void testCustomValidateBufferInterval900Valid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "900"); +runner.assertValid(); +} + +@Test +public void testCustomValidateBufferInterval60Valid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "60"); +runner.assertValid(); +} + +@Test +public void testCustomValidateBufferInterval901InValid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "901"); +runner.assertNotValid(); +} + +@Test +public void testCustomValidateBufferInterval59InValid() { +runner.setProperty(PutKinesisFirehose.MAX_BUFFER_INTERVAL, "59"); +runner.assertNotValid(); +} + +/** + * Comment out ignore for integration tests (requires creds files) + */ +@Test +@Ignore --- End diff -- The profile to run ITs is not enabled by default and this Ignore is unneeded. Would prefer to have removed so that no code changes are needed to run them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r53849365 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult; +import com.amazonaws.services.kinesisfirehose.model.Record; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"}) +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. " ++ "In order to send data to firehose, the firehose delivery stream name has to be specified.") +@WritesAttributes({ +@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")}) +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { + +/** + * Kinesis put record response error message + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message"; + +/** + * Kinesis put record response error code + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code"; + +/** + * Kinesis put record response record id + */ +public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id"; + +public static final List properties = Collections.unmodifiableList( +Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, MAX_BUFFER_INTERVAL, + MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, + PROXY_HOST,PROXY_HOST_PORT)); + +@Override +protected List getSupportedPropertyDescriptors() { +return properties; +} + +@Override +protected Collection customValidate(final ValidationContext validationContext) { +final List problems = new ArrayList<>(super.customValidate(validationContext));
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r53846413 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/firehose/ITPutKinesisFirehose.java --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.util.List; + +import org.apache.nifi.processors.aws.s3.FetchS3Object; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +/** + * This test contains both unit and integration test (integration tests are ignored by default) + */ +public class ITPutKinesisFirehose { + +private TestRunner runner; +protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + +@Before +public void setUp() throws Exception { +runner = TestRunners.newTestRunner(PutKinesisFirehose.class); +runner.setProperty(PutKinesisFirehose.ACCESS_KEY, "abcd"); +runner.setProperty(PutKinesisFirehose.SECRET_KEY, "secret key"); + runner.setProperty(PutKinesisFirehose.KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, "deliveryName"); +runner.assertValid(); +} + +@After +public void tearDown() throws Exception { +runner = null; +} + +@Test +public void testCustomValidateBatchSize1Valid() { --- End diff -- These customValidate tests with the above improvements likely add little and could be removed with adjusted validators. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r53845533 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult; +import com.amazonaws.services.kinesisfirehose.model.Record; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"}) +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. " ++ "In order to send data to firehose, the firehose delivery stream name has to be specified.") +@WritesAttributes({ +@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")}) +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { + +/** + * Kinesis put record response error message + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message"; + +/** + * Kinesis put record response error code + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code"; + +/** + * Kinesis put record response record id + */ +public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id"; + +public static final List properties = Collections.unmodifiableList( +Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, MAX_BUFFER_INTERVAL, + MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, + PROXY_HOST,PROXY_HOST_PORT)); + +@Override +protected List getSupportedPropertyDescriptors() { +return properties; +} + +@Override +protected Collection customValidate(final ValidationContext validationContext) { +final List problems = new ArrayList<>(super.customValidate(validationContext));
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r53845261 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult; +import com.amazonaws.services.kinesisfirehose.model.Record; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"}) +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. " ++ "In order to send data to firehose, the firehose delivery stream name has to be specified.") +@WritesAttributes({ +@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")}) +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { + +/** + * Kinesis put record response error message + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message"; + +/** + * Kinesis put record response error code + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code"; + +/** + * Kinesis put record response record id + */ +public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id"; + +public static final List properties = Collections.unmodifiableList( +Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, MAX_BUFFER_INTERVAL, + MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, --- End diff -- If BUFFER_INTERVAL and BUFFER_SIZE are to go unused, they will also need to be removed from here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r53845184 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult; +import com.amazonaws.services.kinesisfirehose.model.Record; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "firehose", "kinesis", "put", "stream"}) +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis Firehose. " ++ "In order to send data to firehose, the firehose delivery stream name has to be specified.") +@WritesAttributes({ +@WritesAttribute(attribute = "aws.kinesis.firehose.error.message", description = "Error message on posting message to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.error.code", description = "Error code for the message when posting to AWS Kinesis Firehose"), +@WritesAttribute(attribute = "aws.kinesis.firehose.record.id", description = "Record id of the message posted to Kinesis Firehose")}) +public class PutKinesisFirehose extends AbstractKinesisFirehoseProcessor { + +/** + * Kinesis put record response error message + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message"; + +/** + * Kinesis put record response error code + */ +public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code"; + +/** + * Kinesis put record response record id + */ +public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id"; + +public static final List properties = Collections.unmodifiableList( +Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, MAX_BUFFER_INTERVAL, + MAX_BUFFER_SIZE, BATCH_SIZE, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, + PROXY_HOST,PROXY_HOST_PORT)); + +@Override +protected List getSupportedPropertyDescriptors() { +return properties; +} + +@Override +protected Collection customValidate(final ValidationContext validationContext) { --- End diff -- With the alternative validators specified above, it should be
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r53845121 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; + +/** + * This class provides processor the base class for kinesis firehose + */ +public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor { + +public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder() +.name("Amazon Kinesis Firehose Delivery Stream Name") +.description("The name of kinesis firehose delivery stream") +.expressionLanguageSupported(false) +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor MAX_BUFFER_INTERVAL = new PropertyDescriptor.Builder() +.name("Max Buffer Interval") +.description("Buffering interval for messages (between 60 and 900 seconds).") +.defaultValue("60") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.sensitive(false) +.build(); + +public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() +.name("Max Buffer Size (MB)") +.description("Buffering size for messages (between 1MB and 128MB).") +.defaultValue("128") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.sensitive(false) +.build(); + +public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() --- End diff -- I see this is used, but again, consider the aforementioned validator to get a range and help remove the unneeded customValidate --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r53844969 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; + +/** + * This class provides processor the base class for kinesis firehose + */ +public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor { + +public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder() +.name("Amazon Kinesis Firehose Delivery Stream Name") +.description("The name of kinesis firehose delivery stream") +.expressionLanguageSupported(false) +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor MAX_BUFFER_INTERVAL = new PropertyDescriptor.Builder() +.name("Max Buffer Interval") +.description("Buffering interval for messages (between 60 and 900 seconds).") +.defaultValue("60") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.sensitive(false) +.build(); + +public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() --- End diff -- Same commentary as above for MAX_BUFFER_INTERVAL. Not sure how this is being used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user apiri commented on a diff in the pull request: https://github.com/apache/nifi/pull/213#discussion_r53844010 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.firehose; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient; + +/** + * This class provides processor the base class for kinesis firehose + */ +public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor { + +public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder() +.name("Amazon Kinesis Firehose Delivery Stream Name") +.description("The name of kinesis firehose delivery stream") +.expressionLanguageSupported(false) +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor MAX_BUFFER_INTERVAL = new PropertyDescriptor.Builder() +.name("Max Buffer Interval") +.description("Buffering interval for messages (between 60 and 900 seconds).") +.defaultValue("60") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) --- End diff -- I don't see this property being passed through to the associated KinesisFirehoseClient. Am I overlooking something? If so, you should prefer the usage of StandardValidators#createLongValidator to create a bounded range. This would let you then not have to do your customValidate logic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user mans2singh commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-185913781 Just wanted to check if there is any other feedback/recommendation on Kinesis Firehose put processor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user mans2singh commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-184885151 @apiri I've rebased the forked branch. Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
Github user apiri commented on the pull request: https://github.com/apache/nifi/pull/213#issuecomment-184862199 @mans2singh Would you be able to rebase and squash these commits now that the proxy support has been included? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: Nifi 1489 (Support for http proxy) + Nifi 1495 ...
GitHub user mans2singh opened a pull request: https://github.com/apache/nifi/pull/213 Nifi 1489 (Support for http proxy) + Nifi 1495 (AWS Kinesis Firehose) This pull request combines Http proxy enhancement for aws processors (nifi-1489) and aws kinesis firehose processor (nifi-1495) You can merge this pull request into a Git repository by running: $ git pull https://github.com/mans2singh/nifi nifi-1495 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/213.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #213 commit 83c05145ffe9caab932698cb970cde27d905cf81 Author: mans2singhDate: 2016-02-08T19:43:03Z Added support for using proxy commit 2b8772cdee082528ce121fcc9034c78f5960e233 Author: mans2singh Date: 2016-02-09T19:49:05Z Added support for Kinesis Firehose commit 7bf42bbf6038fc0147934ab4ed45cc1729e8e13f Author: mans2singh Date: 2016-02-10T03:55:38Z minor comments commit fd10d0b76a66ce9fc7868f87c84c1ebc2c25f995 Author: mans2singh Date: 2016-02-08T19:43:03Z Added support for using proxy commit ca28ad9f05bcb1e3a191096bd8ca10de01628bbd Author: mans2singh Date: 2016-02-09T19:49:05Z Added support for Kinesis Firehose commit 875c961c3181aed367361ea8bceba0fe80f5f274 Author: mans2singh Date: 2016-02-10T03:55:38Z minor comments commit 28e8cfda694a190b58d65a0657a04e92fff5c1ec Author: mans2singh Date: 2016-02-10T04:01:47Z Merge branch 'awsfirehose' of https://github.com/mans2singh/nifi into awsfirehose commit 1ccfab0916e372c738aeac8cb2ab6556b4d8191f Author: mans2singh Date: 2016-02-10T05:16:26Z explicit check for proxy host and port commit 4febccf6d3c2157cf0d68009beff5b9923e81d17 Author: mans2singh Date: 2016-02-10T05:22:19Z added comments to kinesis test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---