[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-08-24 Thread rickysaltzer
Github user rickysaltzer commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r135044207
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1a.xml
 ---
@@ -31,7 +31,7 @@
 
 org.apache.nifi
 nifi-standard-nar
-1.3.0-SNAPSHOT
+1.4.0-SNAPSHOT
--- End diff --

I'm reverting this..not sure why this is changed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-08-24 Thread rickysaltzer
Github user rickysaltzer commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r135044225
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/nifi/fingerprint/flow1b.xml
 ---
@@ -33,7 +33,7 @@
 
 org.apache.nifi
 nifi-standard-nar
-1.3.0-SNAPSHOT
+1.4.0-SNAPSHOT
--- End diff --

Same here 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-22 Thread rickysaltzer
Github user rickysaltzer commented on the issue:

https://github.com/apache/nifi/pull/2020
  
hey @cammachusa sorry for gap in time, been pretty busy. I don't know if 
this is the best way to do it, but here's a shot :) 

First reset your `HEAD` inside your branch to the last commit before you 
started your PR. 

$ git reset --hard 02c05bc2037ca2e4ee1850a8cb765d85a4f3b8a3 

Add my repo as a remote

$ git remote add ricky https://github.com/rickysaltzer/nifi.git

Fetch my repo and branches

$ git fetch ricky

Merge my rebase onto your branch. 

$ git merge ricky/NiFi-3973 

Make sure you see the following commit for your latest

$ git log --oneline | head -n1
1101ffc [NiFi-3973] Add PutKudu processor to ingest data to 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-17 Thread rickysaltzer
Github user rickysaltzer commented on the issue:

https://github.com/apache/nifi/pull/2020
  
You could push my force push my commit to this PR and that'll be fine, too. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-16 Thread rickysaltzer
Github user rickysaltzer commented on the issue:

https://github.com/apache/nifi/pull/2020
  
@cammachusa - I updated my branch with a rebase, will you pull it down, 
make sure things are good for you and then force push it to this review? 
Thanks!! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-16 Thread rickysaltzer
Github user rickysaltzer commented on the issue:

https://github.com/apache/nifi/pull/2020
  
I am going to be going through my final rounds of testing, so by the end of 
this week, I promise :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-16 Thread rickysaltzer
Github user rickysaltzer commented on the issue:

https://github.com/apache/nifi/pull/2020
  
False alarm, I got it now :) 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-16 Thread rickysaltzer
Github user rickysaltzer commented on the issue:

https://github.com/apache/nifi/pull/2020
  
@cammachusa I'm having a hard time getting your new commits...as well as 
even understanding how I got them in the first place. 

@joewitt am I missing something regarding getting the new commits? I'm 
running a fetch on the repo located here on github. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-10 Thread rickysaltzer
Github user rickysaltzer commented on the issue:

https://github.com/apache/nifi/pull/2020
  
@cammachusa seems I can't update this PR with my own code. My changes are 
below so you can merge them in and then push them again.


https://github.com/rickysaltzer/nifi/commit/c17bbaff08685f74eef037f5e34dd6339b3aac68

I was able to get a sample pipeline working locally which is great. I want 
to do a little bit more testing before we merge it in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-10 Thread rickysaltzer
Github user rickysaltzer commented on the issue:

https://github.com/apache/nifi/pull/2020
  
I've made some changes that allows the tests to run successfully. I'm going 
to test out this in an actual NiFi installation and then provide you my 
changes. Unfortunately this may not happen today due to my schedule. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-10 Thread rickysaltzer
Github user rickysaltzer commented on the issue:

https://github.com/apache/nifi/pull/2020
  
@cammachusa thanks for the reminder :) - I've been a bit busy due to 
business travel. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---



[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-07 Thread rickysaltzer
Github user rickysaltzer commented on the issue:

https://github.com/apache/nifi/pull/2020
  
Hey @cammachusa  - I will take a look today..been traveling. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-04 Thread rickysaltzer
Github user rickysaltzer commented on the issue:

https://github.com/apache/nifi/pull/2020
  
@cammachusa thanks! I think maybe you need to change the arguments given to 
your method once you use `OnEnabled`. I think it switches to 
`ConfigurationContext` instead of `ProcessContext` (or the other way) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-04 Thread rickysaltzer
Github user rickysaltzer commented on the issue:

https://github.com/apache/nifi/pull/2020
  
@cammach @cammachusa 

Still seeing this error - what did you do to fix it?

testSkipHeadLineTrue(org.apache.nifi.processors.kudu.TestPutKudu)  Time 
elapsed: 0.001 sec  <<< ERROR!
java.lang.NoClassDefFoundError: Could not initialize class 
 org.apache.nifi.processors.kudu.MockPutKudu
at 
org.apache.nifi.processors.kudu.TestPutKudu.setUp(TestPutKudu.java:65)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-04 Thread rickysaltzer
Github user rickysaltzer commented on the issue:

https://github.com/apache/nifi/pull/2020
  
I've got your changes locally to do some testing, thanks! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-08-03 Thread rickysaltzer
Github user rickysaltzer commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r131244333
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.Upsert;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractKudu extends AbstractProcessor {
+
+protected static final PropertyDescriptor KUDU_MASTERS = new 
PropertyDescriptor.Builder()
+.name("KUDU Masters")
+.description("List all kudu masters's ip with port (e.g. 
7051), comma separated")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+.name("Table Name")
+.description("The name of the Kudu Table to put data into")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+.name("record-reader")
+.displayName("Record Reader")
+.description("The service for reading records from incoming 
flow files.")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+protected static final PropertyDescriptor SKIP_HEAD_LINE = new 
PropertyDescriptor.Builder()
+.name("Skip head line")
+.description("Set it to true if your first line is the header 
line e.g. column names")
+.allowableValues("true", "false")
+.defaultValue("true")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final PropertyDescriptor INSERT_OPERATION = new 
PropertyDescriptor.Builder()
+.name("Insert Operation")
+.description("Specify operation for this processor. 
Insert-Ignore will ignore duplicated rows")
+.allowableValues(OperationType.INSE

[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-08-03 Thread rickysaltzer
Github user rickysaltzer commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r131242503
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.Upsert;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractKudu extends AbstractProcessor {
+
+protected static final PropertyDescriptor KUDU_MASTERS = new 
PropertyDescriptor.Builder()
+.name("KUDU Masters")
+.description("List all kudu masters's ip with port (e.g. 
7051), comma separated")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+.name("Table Name")
+.description("The name of the Kudu Table to put data into")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+.name("record-reader")
+.displayName("Record Reader")
+.description("The service for reading records from incoming 
flow files.")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+protected static final PropertyDescriptor SKIP_HEAD_LINE = new 
PropertyDescriptor.Builder()
+.name("Skip head line")
+.description("Set it to true if your first line is the header 
line e.g. column names")
+.allowableValues("true", "false")
+.defaultValue("true")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final PropertyDescriptor INSERT_OPERATION = new 
PropertyDescriptor.Builder()
+.name("Insert Operation")
+.description("Specify operation for this processor. 
Insert-Ignore will ignore duplicated rows")
+.allowableValues(OperationType.INSERT.toString(), 
Opera

[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-08-03 Thread rickysaltzer
Github user rickysaltzer commented on the issue:

https://github.com/apache/nifi/pull/2020
  
I will continue reviewing later today or tomorrow, thanks for your 
patience. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #2020: [NiFi-3973] Add PutKudu Processor for ingesting data to Ku...

2017-07-31 Thread rickysaltzer
Github user rickysaltzer commented on the issue:

https://github.com/apache/nifi/pull/2020
  
@joewitt might be the best person to answer @cammachusa's question 
regarding Travis. I seem to recall that it can be finicky, but not 100% the 
current state of CI stability. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-07-31 Thread rickysaltzer
Github user rickysaltzer commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r130409278
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.Upsert;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractKudu extends AbstractProcessor {
+
+protected static final PropertyDescriptor KUDU_MASTERS = new 
PropertyDescriptor.Builder()
+.name("KUDU Masters")
+.description("List all kudu masters's ip with port (e.g. 
7051), comma separated")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+.name("Table Name")
+.description("The name of the Kudu Table to put data into")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+.name("record-reader")
+.displayName("Record Reader")
+.description("The service for reading records from incoming 
flow files.")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+protected static final PropertyDescriptor SKIP_HEAD_LINE = new 
PropertyDescriptor.Builder()
+.name("Skip head line")
+.description("Set it to true if your first line is the header 
line e.g. column names")
+.allowableValues("true", "false")
+.defaultValue("true")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final PropertyDescriptor INSERT_OPERATION = new 
PropertyDescriptor.Builder()
+.name("INSERT OPERATION")
+.description("Specify operation for this processor. 
Insert-Ignore will ignore duplicated rows")
+.allowableValues("Insert

[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-07-31 Thread rickysaltzer
Github user rickysaltzer commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r130400960
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -84,6 +86,14 @@
 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
 .build();
 
+protected static final PropertyDescriptor INSERT_OPERATION = new 
PropertyDescriptor.Builder()
+.name("INSERT OPERATION")
+.description("Specify operation for this processor. 
Insert-Ignore will ignore duplicated rows")
+.allowableValues("Insert", "Insert-Ignore", "Upsert")
--- End diff --

It's generally best practice for us to use an Enum instead of a list of 
strings when using `allowableValues`..it mainly just lends itself to cleaner 
looking code later down the road. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-07-31 Thread rickysaltzer
Github user rickysaltzer commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r130400374
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -84,6 +86,14 @@
 .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
 .build();
 
+protected static final PropertyDescriptor INSERT_OPERATION = new 
PropertyDescriptor.Builder()
+.name("INSERT OPERATION")
--- End diff --

Let's just capitalize the words instead of each letter. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-07-28 Thread rickysaltzer
Github user rickysaltzer commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r130158483
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Insert;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractKudu extends AbstractProcessor {
+
+protected static final PropertyDescriptor KUDU_MASTERS = new 
PropertyDescriptor.Builder()
+.name("KUDU Masters")
+.description("List all kudu masters's ip with port (e.g. 
7051), comma separated")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+.name("Table Name")
+.description("The name of the Kudu Table to put data into")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+.name("record-reader")
+.displayName("Record Reader")
+.description("The service for reading records from incoming 
flow files.")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+protected static final PropertyDescriptor SKIP_HEAD_LINE = new 
PropertyDescriptor.Builder()
+.name("Skip head line")
+.description("Set it to true if your first line is the header 
line e.g. column names")
+.allowableValues("true", "false")
+.defaultValue("true")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+.name("success")
+.description("A FlowFile is routed to this relationship after 
it has been successfully stored in Kudu")
+.build();
+protected static final Relationship REL_FAILURE = new 
Relationship.Builder()
+.name("failure")
+.des

[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-07-27 Thread rickysaltzer
Github user rickysaltzer commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r129937204
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Insert;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractKudu extends AbstractProcessor {
+
+protected static final PropertyDescriptor KUDU_MASTERS = new 
PropertyDescriptor.Builder()
+.name("KUDU Masters")
+.description("List all kudu masters's ip with port (e.g. 
7051), comma separated")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+.name("Table Name")
+.description("The name of the Kudu Table to put data into")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+.name("record-reader")
+.displayName("Record Reader")
+.description("The service for reading records from incoming 
flow files.")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+protected static final PropertyDescriptor SKIP_HEAD_LINE = new 
PropertyDescriptor.Builder()
+.name("Skip head line")
+.description("Set it to true if your first line is the header 
line e.g. column names")
+.allowableValues("true", "false")
+.defaultValue("true")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+.name("success")
+.description("A FlowFile is routed to this relationship after 
it has been successfully stored in Kudu")
+.build();
+protected static final Relationship REL_FAILURE = new 
Relationship.Builder()
+.name("failure")
+.des

[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-07-27 Thread rickysaltzer
Github user rickysaltzer commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r129937107
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Insert;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractKudu extends AbstractProcessor {
+
+protected static final PropertyDescriptor KUDU_MASTERS = new 
PropertyDescriptor.Builder()
+.name("KUDU Masters")
+.description("List all kudu masters's ip with port (e.g. 
7051), comma separated")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+.name("Table Name")
+.description("The name of the Kudu Table to put data into")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+.name("record-reader")
+.displayName("Record Reader")
+.description("The service for reading records from incoming 
flow files.")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+protected static final PropertyDescriptor SKIP_HEAD_LINE = new 
PropertyDescriptor.Builder()
+.name("Skip head line")
+.description("Set it to true if your first line is the header 
line e.g. column names")
+.allowableValues("true", "false")
+.defaultValue("true")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+.name("success")
+.description("A FlowFile is routed to this relationship after 
it has been successfully stored in Kudu")
+.build();
+protected static final Relationship REL_FAILURE = new 
Relationship.Builder()
+.name("failure")
+.des

[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-07-27 Thread rickysaltzer
Github user rickysaltzer commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r129936165
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/AbstractKudu.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduException;
+import org.apache.kudu.client.KuduSession;
+import org.apache.kudu.client.KuduTable;
+import org.apache.kudu.client.Insert;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import 
org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public abstract class AbstractKudu extends AbstractProcessor {
+
+protected static final PropertyDescriptor KUDU_MASTERS = new 
PropertyDescriptor.Builder()
+.name("KUDU Masters")
+.description("List all kudu masters's ip with port (e.g. 
7051), comma separated")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor.Builder()
+.name("Table Name")
+.description("The name of the Kudu Table to put data into")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+.name("record-reader")
+.displayName("Record Reader")
+.description("The service for reading records from incoming 
flow files.")
+.identifiesControllerService(RecordReaderFactory.class)
+.required(true)
+.build();
+
+protected static final PropertyDescriptor SKIP_HEAD_LINE = new 
PropertyDescriptor.Builder()
+.name("Skip head line")
+.description("Set it to true if your first line is the header 
line e.g. column names")
+.allowableValues("true", "false")
+.defaultValue("true")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+protected static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+.name("success")
+.description("A FlowFile is routed to this relationship after 
it has been successfully stored in Kudu")
+.build();
+protected static final Relationship REL_FAILURE = new 
Relationship.Builder()
+.name("failure")
+.des

[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-07-27 Thread rickysaltzer
Github user rickysaltzer commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r129933513
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.KuduTable;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "database", "NoSQL", "kudu", "HDFS"})
+@CapabilityDescription("Reads records from an incoming FlowFile using the 
provided Record Reader, and writes those records " +
+"to the specified Kudu's table. The schema for the table must be 
provided in the processor properties or from your source." +
+" If any error occurs while reading records from the input, or 
writing records to Kudu, the FlowFile will be routed to failure")
+
+public class PutKudu extends AbstractKudu {
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+final List properties = new ArrayList<>();
+properties.add(KUDU_MASTERS);
+properties.add(TABLE_NAME);
+properties.add(SKIP_HEAD_LINE);
+properties.add(RECORD_READER);
+
+return properties;
+}
+
+@Override
+public Set getRelationships() {
+final Set rels = new HashSet<>();
+rels.add(REL_SUCCESS);
+rels.add(REL_FAILURE);
+return rels;
+}
+
+@Override
+protected Insert insertRecordToKudu(KuduTable kuduTable, Record 
record, List fieldNames) throws IllegalStateException, Exception {
+Insert insert = kuduTable.newInsert();
+PartialRow row = insert.getRow();
+Schema colSchema = kuduTable.getSchema();
+
+for (String colName : fieldNames) {
+int colIdx = this.getColumnIndex(colSchema, colName);
+if (colIdx != -1) {
+Type colType = 
colSchema.getColumnByIndex(colIdx).getType();
+
+switch (colType.getDataType()) {
+case BOOL:
+row.addBoolean(colIdx, 
record.getAsBoolean(colName));
+break;
+case FLOAT:
+row.addFloat(colIdx, record.getAsFloat(colName));
+break;
+case DOUBLE:
+row.addDouble(colIdx, record.getAsDouble(colName));
+break;
+case BINARY:
+row.addBinary(colIdx, 
record.getAsString(colName).getBytes());
+break;
+case INT8:
+case INT16:
--- End diff --

Would be useful to allow users to {{Upsert}} as well as {{Insert}}. This 
would ideally be configurable via the processor's properties. 


---
If your project is set up for it, you can reply to this email and have your
reply appear o

[GitHub] nifi pull request #2020: [NiFi-3973] Add PutKudu Processor for ingesting dat...

2017-07-27 Thread rickysaltzer
Github user rickysaltzer commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2020#discussion_r129928513
  
--- Diff: 
nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kudu;
+
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.client.Insert;
+import org.apache.kudu.client.PartialRow;
+import org.apache.kudu.client.KuduTable;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.serialization.record.Record;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "database", "NoSQL", "kudu", "HDFS"})
+@CapabilityDescription("Reads records from an incoming FlowFile using the 
provided Record Reader, and writes those records " +
+"to the specified Kudu's table. The schema for the table must be 
provided in the processor properties or from your source." +
+" If any error occurs while reading records from the input, or 
writing records to Kudu, the FlowFile will be routed to failure")
+
+public class PutKudu extends AbstractKudu {
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+final List properties = new ArrayList<>();
+properties.add(KUDU_MASTERS);
+properties.add(TABLE_NAME);
+properties.add(SKIP_HEAD_LINE);
+properties.add(RECORD_READER);
+
+return properties;
+}
+
+@Override
+public Set getRelationships() {
+final Set rels = new HashSet<>();
+rels.add(REL_SUCCESS);
+rels.add(REL_FAILURE);
+return rels;
+}
+
+@Override
+protected Insert insertRecordToKudu(KuduTable kuduTable, Record 
record, List fieldNames) throws IllegalStateException, Exception {
+Insert insert = kuduTable.newInsert();
+PartialRow row = insert.getRow();
+Schema colSchema = kuduTable.getSchema();
+
+for (String colName : fieldNames) {
+int colIdx = this.getColumnIndex(colSchema, colName);
+if (colIdx != -1) {
+Type colType = 
colSchema.getColumnByIndex(colIdx).getType();
+
+switch (colType.getDataType()) {
+case BOOL:
+row.addBoolean(colIdx, 
record.getAsBoolean(colName));
+break;
+case FLOAT:
+row.addFloat(colIdx, record.getAsFloat(colName));
+break;
+case DOUBLE:
+row.addDouble(colIdx, record.getAsDouble(colName));
+break;
+case BINARY:
+row.addBinary(colIdx, 
record.getAsString(colName).getBytes());
+break;
+case INT8:
+case INT16:
--- End diff --

Could we write this as a `short` using `row.addShort(..)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wis

[GitHub] nifi pull request #188: NIFI-1337: Add Riemann Reporting Task

2016-10-04 Thread rickysaltzer
Github user rickysaltzer closed the pull request at:

https://github.com/apache/nifi/pull/188


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #285: NIFI-1636: Print Stacktrace When Unexpected OnTrigge...

2016-10-04 Thread rickysaltzer
Github user rickysaltzer closed the pull request at:

https://github.com/apache/nifi/pull/285


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi issue #285: NIFI-1636: Print Stacktrace When Unexpected OnTrigger Excep...

2016-10-04 Thread rickysaltzer
Github user rickysaltzer commented on the issue:

https://github.com/apache/nifi/pull/285
  
@trixpan thanks for the reminder, closing...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #850: NIFI-2547: Add DeleteHDFS Processor

2016-08-19 Thread rickysaltzer
Github user rickysaltzer commented on a diff in the pull request:

https://github.com/apache/nifi/pull/850#discussion_r75538007
  
--- Diff: 
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class TestDeleteHDFS {
+private NiFiProperties mockNiFiProperties;
+private FileSystem mockFileSystem;
+private KerberosProperties kerberosProperties;
+
+@Before
+public void setup() throws Exception {
+mockNiFiProperties = mock(NiFiProperties.class);
+
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
+kerberosProperties = KerberosProperties.create(mockNiFiProperties);
+mockFileSystem = mock(FileSystem.class);
--- End diff --

I could rewrite it to use the local fs, but I was just going off how the 
other tests behaved. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #850: NIFI-2547: Add DeleteHDFS Processor

2016-08-19 Thread rickysaltzer
Github user rickysaltzer commented on a diff in the pull request:

https://github.com/apache/nifi/pull/850#discussion_r75537912
  
--- Diff: 
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@TriggerWhenEmpty
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({ "hadoop", "HDFS", "delete", "remove", "filesystem" })
+@CapabilityDescription("Deletes a file from HDFS. The file can be provided 
as an attribute from an incoming FlowFile, "
++ "or a statically set file that is periodically removed. If this 
processor has an incoming connection, it"
++ "will ignore running on a periodic basis and instead rely on 
incoming FlowFiles to trigger a delete. "
++ "Optionally, you may specify use a wildcard character to match 
multiple files or directories.")
+public class DeleteHDFS extends AbstractHadoopProcessor {
+public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+.name("success")
+.description("FlowFiles will be routed here if the delete 
command was successful")
+.build();
+
+public static final Relationship REL_FAILURE = new 
Relationship.Builder()
+.name("failure")
+.description("FlowFiles will be routed here if the delete 
command was unsuccessful")
+.build();
+
+public static final PropertyDescriptor FILE_OR_DIRECTORY = new 
PropertyDescriptor.Builder()
+.name("File or Directory")
+.description("The HDFS file or directory to delete. A wildcard 
expression may be used to only delete certain files")
+.required(true)
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.expressionLanguageSupported(true)
+.build();
+
+public static final PropertyDescriptor RECURSIVE = new 
PropertyDescriptor.Builder()
+.name("Recursive")
+.description("Remove contents of a non-empty directory 
recursively")
+.allowableValues("true", "false")
+.required(true)
+.defaultValue("true")
+.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+.build();
+
+private static final Set relationships;
+
+static {
+final Set relationshipSet = new HashSet<>();
+relationshipSet.add(REL_SUCCESS);
+relationshipSet.add(REL_FAILURE);
+relationships = Collections.unmodifiableSet(relationshipSet);

[GitHub] nifi pull request #850: NIFI-2547: Add DeleteHDFS Processor

2016-08-12 Thread rickysaltzer
GitHub user rickysaltzer opened a pull request:

https://github.com/apache/nifi/pull/850

NIFI-2547: Add DeleteHDFS Processor

This processor adds the capability to delete files or
directories inside of HDFS.

Paths supports both static and expression language values,
as well as glob support (e.g. /data/for/2016/07/*).

This processor may be used standalone, as well as part of a
downstream connection.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rickysaltzer/nifi NIFI-2547

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/850.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #850


commit d42fe48779eefbdfe936f2b3745b7eed1fe31d6e
Author: ricky <ri...@cloudera.com>
Date:   2016-08-10T23:14:39Z

NIFI-2547: Add DeleteHDFS Processor

This processor adds the capability to delete files or
directories inside of HDFS.

Paths supports both static and expression language values,
as well as glob support (e.g. /data/for/2016/07/*).

This processor may be used standalone, as well as part of a
downstream connection.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---