[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-03-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2478


---


[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-26 Thread bdesert
Github user bdesert commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r170700319
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
 ---
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonFullRowSerializer;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hbase", "scan", "fetch", "get"})
+@CapabilityDescription("Scans and fetches rows from an HBase table. This 
processor may be used to fetch rows from hbase table by specifying a range of 
rowkey values (start and/or end ),"
++ "by time range, by filter expression, or any combination of 
them. "
++ "Order of records can be controlled by a property Reversed"
++ "Number of rows retrieved by the processor can be limited.")
+@WritesAttributes({
+@WritesAttribute(attribute = "hbase.table", description = "The 
name of the HBase table that the row was fetched from"),
+@WritesAttribute(attribute = "hbase.resultset", description = "A 
JSON document/s representing the row/s. This property is only written when a 
Destination of flowfile-attributes is selected."),
+@WritesAttribute(attribute = "mime.type", description = "Set to 
application/json when using a Destination of flowfile-content, not set or 
modified otherwise"),
+@WritesAttribute(attribute = "hbase.rows.count", description = 
"Number of rows in the content of given flow file"),
+@WritesAttribute(attribute = "scanhbase.results.found", 
description = "Indicates whether at least one row has been found in given hbase 
table with provided conditions. "
++ "Could be null (not present) if transfered to FAILURE")
+})
+public class ScanHBase extends AbstractProcessor {
+//enhanced regex for columns to allow "-" in column qualifier names
+static final Pattern COLUMNS_PATTERN = 
Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
+static final 

[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-26 Thread bdesert
Github user bdesert commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r170697126
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
 ---
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonFullRowSerializer;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hbase", "scan", "fetch", "get"})
+@CapabilityDescription("Scans and fetches rows from an HBase table. This 
processor may be used to fetch rows from hbase table by specifying a range of 
rowkey values (start and/or end ),"
++ "by time range, by filter expression, or any combination of 
them. "
++ "Order of records can be controlled by a property Reversed"
++ "Number of rows retrieved by the processor can be limited.")
+@WritesAttributes({
+@WritesAttribute(attribute = "hbase.table", description = "The 
name of the HBase table that the row was fetched from"),
+@WritesAttribute(attribute = "hbase.resultset", description = "A 
JSON document/s representing the row/s. This property is only written when a 
Destination of flowfile-attributes is selected."),
+@WritesAttribute(attribute = "mime.type", description = "Set to 
application/json when using a Destination of flowfile-content, not set or 
modified otherwise"),
+@WritesAttribute(attribute = "hbase.rows.count", description = 
"Number of rows in the content of given flow file"),
+@WritesAttribute(attribute = "scanhbase.results.found", 
description = "Indicates whether at least one row has been found in given hbase 
table with provided conditions. "
++ "Could be null (not present) if transfered to FAILURE")
+})
+public class ScanHBase extends AbstractProcessor {
+//enhanced regex for columns to allow "-" in column qualifier names
+static final Pattern COLUMNS_PATTERN = 
Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
+static final 

[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-26 Thread bbende
Github user bbende commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r170684472
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
 ---
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonFullRowSerializer;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hbase", "scan", "fetch", "get"})
+@CapabilityDescription("Scans and fetches rows from an HBase table. This 
processor may be used to fetch rows from hbase table by specifying a range of 
rowkey values (start and/or end ),"
++ "by time range, by filter expression, or any combination of 
them. "
++ "Order of records can be controlled by a property Reversed"
++ "Number of rows retrieved by the processor can be limited.")
+@WritesAttributes({
+@WritesAttribute(attribute = "hbase.table", description = "The 
name of the HBase table that the row was fetched from"),
+@WritesAttribute(attribute = "hbase.resultset", description = "A 
JSON document/s representing the row/s. This property is only written when a 
Destination of flowfile-attributes is selected."),
+@WritesAttribute(attribute = "mime.type", description = "Set to 
application/json when using a Destination of flowfile-content, not set or 
modified otherwise"),
+@WritesAttribute(attribute = "hbase.rows.count", description = 
"Number of rows in the content of given flow file"),
+@WritesAttribute(attribute = "scanhbase.results.found", 
description = "Indicates whether at least one row has been found in given hbase 
table with provided conditions. "
++ "Could be null (not present) if transfered to FAILURE")
+})
+public class ScanHBase extends AbstractProcessor {
+//enhanced regex for columns to allow "-" in column qualifier names
+static final Pattern COLUMNS_PATTERN = 
Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
+static final 

[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-26 Thread bbende
Github user bbende commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r170683857
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
 ---
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonFullRowSerializer;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hbase", "scan", "fetch", "get"})
+@CapabilityDescription("Scans and fetches rows from an HBase table. This 
processor may be used to fetch rows from hbase table by specifying a range of 
rowkey values (start and/or end ),"
++ "by time range, by filter expression, or any combination of 
them. "
++ "Order of records can be controlled by a property Reversed"
++ "Number of rows retrieved by the processor can be limited.")
+@WritesAttributes({
+@WritesAttribute(attribute = "hbase.table", description = "The 
name of the HBase table that the row was fetched from"),
+@WritesAttribute(attribute = "hbase.resultset", description = "A 
JSON document/s representing the row/s. This property is only written when a 
Destination of flowfile-attributes is selected."),
--- End diff --

Can this be removed? Doesn't look like we allow writing the results to an 
attribute, which is a good thing :)


---


[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-26 Thread bbende
Github user bbende commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r170688235
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
 ---
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonFullRowSerializer;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hbase", "scan", "fetch", "get"})
+@CapabilityDescription("Scans and fetches rows from an HBase table. This 
processor may be used to fetch rows from hbase table by specifying a range of 
rowkey values (start and/or end ),"
++ "by time range, by filter expression, or any combination of 
them. "
++ "Order of records can be controlled by a property Reversed"
++ "Number of rows retrieved by the processor can be limited.")
+@WritesAttributes({
+@WritesAttribute(attribute = "hbase.table", description = "The 
name of the HBase table that the row was fetched from"),
+@WritesAttribute(attribute = "hbase.resultset", description = "A 
JSON document/s representing the row/s. This property is only written when a 
Destination of flowfile-attributes is selected."),
+@WritesAttribute(attribute = "mime.type", description = "Set to 
application/json when using a Destination of flowfile-content, not set or 
modified otherwise"),
+@WritesAttribute(attribute = "hbase.rows.count", description = 
"Number of rows in the content of given flow file"),
+@WritesAttribute(attribute = "scanhbase.results.found", 
description = "Indicates whether at least one row has been found in given hbase 
table with provided conditions. "
++ "Could be null (not present) if transfered to FAILURE")
+})
+public class ScanHBase extends AbstractProcessor {
+//enhanced regex for columns to allow "-" in column qualifier names
+static final Pattern COLUMNS_PATTERN = 
Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
+static final 

[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-26 Thread bbende
Github user bbende commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r170687174
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
 ---
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonFullRowSerializer;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hbase", "scan", "fetch", "get"})
+@CapabilityDescription("Scans and fetches rows from an HBase table. This 
processor may be used to fetch rows from hbase table by specifying a range of 
rowkey values (start and/or end ),"
++ "by time range, by filter expression, or any combination of 
them. "
++ "Order of records can be controlled by a property Reversed"
++ "Number of rows retrieved by the processor can be limited.")
+@WritesAttributes({
+@WritesAttribute(attribute = "hbase.table", description = "The 
name of the HBase table that the row was fetched from"),
+@WritesAttribute(attribute = "hbase.resultset", description = "A 
JSON document/s representing the row/s. This property is only written when a 
Destination of flowfile-attributes is selected."),
+@WritesAttribute(attribute = "mime.type", description = "Set to 
application/json when using a Destination of flowfile-content, not set or 
modified otherwise"),
+@WritesAttribute(attribute = "hbase.rows.count", description = 
"Number of rows in the content of given flow file"),
+@WritesAttribute(attribute = "scanhbase.results.found", 
description = "Indicates whether at least one row has been found in given hbase 
table with provided conditions. "
++ "Could be null (not present) if transfered to FAILURE")
+})
+public class ScanHBase extends AbstractProcessor {
+//enhanced regex for columns to allow "-" in column qualifier names
+static final Pattern COLUMNS_PATTERN = 
Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
+static final 

[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-26 Thread bbende
Github user bbende commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r170686168
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
 ---
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonFullRowSerializer;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hbase", "scan", "fetch", "get"})
+@CapabilityDescription("Scans and fetches rows from an HBase table. This 
processor may be used to fetch rows from hbase table by specifying a range of 
rowkey values (start and/or end ),"
++ "by time range, by filter expression, or any combination of 
them. "
++ "Order of records can be controlled by a property Reversed"
++ "Number of rows retrieved by the processor can be limited.")
+@WritesAttributes({
+@WritesAttribute(attribute = "hbase.table", description = "The 
name of the HBase table that the row was fetched from"),
+@WritesAttribute(attribute = "hbase.resultset", description = "A 
JSON document/s representing the row/s. This property is only written when a 
Destination of flowfile-attributes is selected."),
+@WritesAttribute(attribute = "mime.type", description = "Set to 
application/json when using a Destination of flowfile-content, not set or 
modified otherwise"),
+@WritesAttribute(attribute = "hbase.rows.count", description = 
"Number of rows in the content of given flow file"),
+@WritesAttribute(attribute = "scanhbase.results.found", 
description = "Indicates whether at least one row has been found in given hbase 
table with provided conditions. "
++ "Could be null (not present) if transfered to FAILURE")
+})
+public class ScanHBase extends AbstractProcessor {
+//enhanced regex for columns to allow "-" in column qualifier names
+static final Pattern COLUMNS_PATTERN = 
Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
+static final 

[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-21 Thread bdesert
Github user bdesert commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r169749383
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
 ---
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonFullRowSerializer;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hbase", "scan", "fetch", "get"})
+@CapabilityDescription("Scans and fetches rows from an HBase table. This 
processor may be used to fetch rows from hbase table by specifying a range of 
rowkey values (start and/or end ),"
++ "by time range, by filter expression, or any combination of 
them. \n"
++ "Order of records can be controlled by a property 
Reversed"
++ "Number of rows retrieved by the processor can be limited.")
+@WritesAttributes({
+@WritesAttribute(attribute = "hbase.table", description = "The 
name of the HBase table that the row was fetched from"),
+@WritesAttribute(attribute = "hbase.resultset", description = "A 
JSON document/s representing the row/s. This property is only written when a 
Destination of flowfile-attributes is selected."),
+@WritesAttribute(attribute = "mime.type", description = "Set to 
application/json when using a Destination of flowfile-content, not set or 
modified otherwise"),
+@WritesAttribute(attribute = "hbase.rows.count", description = 
"Number of rows in the content of given flow file"),
+@WritesAttribute(attribute = "scanhbase.results.found", 
description = "Indicates whether at least one row has been found in given hbase 
table with provided conditions. "
++ "Could be null (not present) if transfered to FAILURE")
+})
+public class ScanHBase extends AbstractProcessor {
+//enhanced regex for columns to allow "-" in column qualifier names
+static final Pattern COLUMNS_PATTERN = 
Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
+static 

[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-21 Thread bdesert
Github user bdesert commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r169732184
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
 ---
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonFullRowSerializer;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
--- End diff --

I was thinking about that before. Couldn't really decide, and then I took a 
look at DeleteHBaseRow and FetchHBaseRow and decided to keep it consistent.


---


[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-21 Thread bdesert
Github user bdesert commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r169680760
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java
 ---
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestScanHBase {
+
+private ScanHBase proc;
+private MockHBaseClientService hBaseClientService;
+private TestRunner runner;
+
+@Before
+public void setup() throws InitializationException {
+proc = new ScanHBase();
+runner = TestRunners.newTestRunner(proc);
+
+hBaseClientService = new MockHBaseClientService();
+runner.addControllerService("hbaseClient", hBaseClientService);
+runner.enableControllerService(hBaseClientService);
+runner.setProperty(ScanHBase.HBASE_CLIENT_SERVICE, "hbaseClient");
+}
+
+@Test
+public void testColumnsValidation() {
+runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+runner.setProperty(ScanHBase.START_ROW, "row1");
+runner.setProperty(ScanHBase.END_ROW, "row1");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1,cf2:cq1,cf3");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1 cf2,cf3");
+runner.assertNotValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1:,cf2,cf3");
+runner.assertNotValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,");
+runner.assertNotValid();
+}
+
+@Test
+public void testNoIncomingFlowFile() {
+runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+runner.setProperty(ScanHBase.START_ROW, "row1");
+runner.setProperty(ScanHBase.END_ROW, "row1");
+
+runner.run();
+runner.assertTransferCount(ScanHBase.REL_FAILURE, 0);
+runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0);
+runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0);
+
+Assert.assertEquals(0, hBaseClientService.getNumScans());
+}
+
+@Test
+public void testInvalidTableName() {
+runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}");
+runner.setProperty(ScanHBase.START_ROW, "row1");
+runner.setProperty(ScanHBase.END_ROW, "row1");
+
+runner.enqueue("trigger flow file");
+runner.run();
+
+runner.assertTransferCount(ScanHBase.REL_FAILURE, 1);
+runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0);
+runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0);
+
+Assert.assertEquals(0, hBaseClientService.getNumScans());
+}
+
+@Test
+public void testResultsNotFound() {
+runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+runner.setProperty(ScanHBase.START_ROW, "row1");
+runner.setProperty(ScanHBase.END_ROW, "row1");
+
+runner.enqueue("trigger flow file");
+runner.run();
+
+runner.assertTransferCount(ScanHBase.REL_FAILURE, 0);
+

[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-21 Thread bdesert
Github user bdesert commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r169677516
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java
 ---
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestScanHBase {
+
+private ScanHBase proc;
+private MockHBaseClientService hBaseClientService;
+private TestRunner runner;
+
+@Before
+public void setup() throws InitializationException {
+proc = new ScanHBase();
+runner = TestRunners.newTestRunner(proc);
+
+hBaseClientService = new MockHBaseClientService();
+runner.addControllerService("hbaseClient", hBaseClientService);
+runner.enableControllerService(hBaseClientService);
+runner.setProperty(ScanHBase.HBASE_CLIENT_SERVICE, "hbaseClient");
+}
+
+@Test
+public void testColumnsValidation() {
+runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+runner.setProperty(ScanHBase.START_ROW, "row1");
+runner.setProperty(ScanHBase.END_ROW, "row1");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1,cf2:cq1,cf3");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1 cf2,cf3");
+runner.assertNotValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1:,cf2,cf3");
+runner.assertNotValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,");
+runner.assertNotValid();
+}
+
+@Test
+public void testNoIncomingFlowFile() {
+runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+runner.setProperty(ScanHBase.START_ROW, "row1");
+runner.setProperty(ScanHBase.END_ROW, "row1");
+
+runner.run();
+runner.assertTransferCount(ScanHBase.REL_FAILURE, 0);
+runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0);
+runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0);
+
+Assert.assertEquals(0, hBaseClientService.getNumScans());
+}
+
+@Test
+public void testInvalidTableName() {
+runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}");
--- End diff --

Not setting a value for "hbase.table" is intentional. This test is for 
failure handling if expression is invalid (cannot be evaluated). You can see 
that FF expected at REL_FAILURE without scans. If I just didn't understand what 
you meant, please let me know.


---


[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-21 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r169624523
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
 ---
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonFullRowSerializer;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hbase", "scan", "fetch", "get"})
+@CapabilityDescription("Scans and fetches rows from an HBase table. This 
processor may be used to fetch rows from hbase table by specifying a range of 
rowkey values (start and/or end ),"
++ "by time range, by filter expression, or any combination of 
them. \n"
++ "Order of records can be controlled by a property 
Reversed"
++ "Number of rows retrieved by the processor can be limited.")
+@WritesAttributes({
+@WritesAttribute(attribute = "hbase.table", description = "The 
name of the HBase table that the row was fetched from"),
+@WritesAttribute(attribute = "hbase.resultset", description = "A 
JSON document/s representing the row/s. This property is only written when a 
Destination of flowfile-attributes is selected."),
+@WritesAttribute(attribute = "mime.type", description = "Set to 
application/json when using a Destination of flowfile-content, not set or 
modified otherwise"),
+@WritesAttribute(attribute = "hbase.rows.count", description = 
"Number of rows in the content of given flow file"),
+@WritesAttribute(attribute = "scanhbase.results.found", 
description = "Indicates whether at least one row has been found in given hbase 
table with provided conditions. "
++ "Could be null (not present) if transfered to FAILURE")
+})
+public class ScanHBase extends AbstractProcessor {
+//enhanced regex for columns to allow "-" in column qualifier names
+static final Pattern COLUMNS_PATTERN = 
Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
+

[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-21 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r169622730
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
 ---
@@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] 
startRow, final byte[] end
 }
 }
 
+@Override
+public void scan(final String tableName, final String startRow, final 
String endRow, String filterExpression,
+final Long timerangeMin, final Long timerangeMax, final 
Integer limitRows, final Boolean isReversed,
+final Collection columns, final ResultHandler handler) 
throws IOException {
+
+try (final Table table = 
connection.getTable(TableName.valueOf(tableName));
+final ResultScanner scanner = getResults(table, startRow, 
endRow, filterExpression, timerangeMin,
+timerangeMax, limitRows, isReversed, columns)) {
+
+int cnt = 0;
+final int lim = limitRows != null ? limitRows : 0;
+for (final Result result : scanner) {
+
+if (lim > 0 && cnt++ > lim) break;
+
+final byte[] rowKey = result.getRow();
+final Cell[] cells = result.rawCells();
+
+if (cells == null) {
+continue;
+}
+
+// convert HBase cells to NiFi cells
+final ResultCell[] resultCells = new 
ResultCell[cells.length];
+for (int i = 0; i < cells.length; i++) {
+final Cell cell = cells[i];
+final ResultCell resultCell = getResultCell(cell);
+resultCells[i] = resultCell;
+}
+
+// delegate to the handler
+handler.handle(rowKey, resultCells);
+}
+}
+
+}
+
+//
+protected ResultScanner getResults(final Table table, final String 
startRow, final String endRow, final String filterExpression, final Long 
timerangeMin, final Long timerangeMax,
+final Integer limitRows, final Boolean isReversed, final 
Collection columns)  throws IOException {
+final Scan scan = new Scan();
+if (!StringUtils.isBlank(startRow)) 
scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
+if (!StringUtils.isBlank(endRow))   scan.setStopRow(   
endRow.getBytes(StandardCharsets.UTF_8));
+
+
+Filter filter = null;
+if (columns != null) {
+for (Column col : columns) {
+if (col.getQualifier() == null) {
+scan.addFamily(col.getFamily());
+} else {
+scan.addColumn(col.getFamily(), col.getQualifier());
+}
+}
+}else if (!StringUtils.isBlank(filterExpression)) {
+ParseFilter parseFilter = new ParseFilter();
+filter = parseFilter.parseFilterString(filterExpression);
+}
+if (filter != null) scan.setFilter(filter);
+
+if (timerangeMin != null && timerangeMax != null) 
scan.setTimeRange(timerangeMin, timerangeMax);
--- End diff --

Separate lines please.


---


[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-21 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r169623164
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
 ---
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonFullRowSerializer;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
--- End diff --

Are you sure you don't want to do `INPUT_ALLOWED`? I can see good reasons 
not to do that. If you want to make it optional, see `ExecuteSQL` for some code 
you can steal to make it not do processing on flowfile input AND time interval.


---


[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-21 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r169622786
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
 ---
@@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] 
startRow, final byte[] end
 }
 }
 
+@Override
+public void scan(final String tableName, final String startRow, final 
String endRow, String filterExpression,
+final Long timerangeMin, final Long timerangeMax, final 
Integer limitRows, final Boolean isReversed,
+final Collection columns, final ResultHandler handler) 
throws IOException {
+
+try (final Table table = 
connection.getTable(TableName.valueOf(tableName));
+final ResultScanner scanner = getResults(table, startRow, 
endRow, filterExpression, timerangeMin,
+timerangeMax, limitRows, isReversed, columns)) {
+
+int cnt = 0;
+final int lim = limitRows != null ? limitRows : 0;
+for (final Result result : scanner) {
+
+if (lim > 0 && cnt++ > lim) break;
+
+final byte[] rowKey = result.getRow();
+final Cell[] cells = result.rawCells();
+
+if (cells == null) {
+continue;
+}
+
+// convert HBase cells to NiFi cells
+final ResultCell[] resultCells = new 
ResultCell[cells.length];
+for (int i = 0; i < cells.length; i++) {
+final Cell cell = cells[i];
+final ResultCell resultCell = getResultCell(cell);
+resultCells[i] = resultCell;
+}
+
+// delegate to the handler
+handler.handle(rowKey, resultCells);
+}
+}
+
+}
+
+//
+protected ResultScanner getResults(final Table table, final String 
startRow, final String endRow, final String filterExpression, final Long 
timerangeMin, final Long timerangeMax,
+final Integer limitRows, final Boolean isReversed, final 
Collection columns)  throws IOException {
+final Scan scan = new Scan();
+if (!StringUtils.isBlank(startRow)) 
scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
+if (!StringUtils.isBlank(endRow))   scan.setStopRow(   
endRow.getBytes(StandardCharsets.UTF_8));
+
+
+Filter filter = null;
+if (columns != null) {
+for (Column col : columns) {
+if (col.getQualifier() == null) {
+scan.addFamily(col.getFamily());
+} else {
+scan.addColumn(col.getFamily(), col.getQualifier());
+}
+}
+}else if (!StringUtils.isBlank(filterExpression)) {
+ParseFilter parseFilter = new ParseFilter();
+filter = parseFilter.parseFilterString(filterExpression);
+}
+if (filter != null) scan.setFilter(filter);
--- End diff --

Separate lines with curly brackets.


---


[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-21 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r169622823
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
 ---
@@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] 
startRow, final byte[] end
 }
 }
 
+@Override
+public void scan(final String tableName, final String startRow, final 
String endRow, String filterExpression,
+final Long timerangeMin, final Long timerangeMax, final 
Integer limitRows, final Boolean isReversed,
+final Collection columns, final ResultHandler handler) 
throws IOException {
+
+try (final Table table = 
connection.getTable(TableName.valueOf(tableName));
+final ResultScanner scanner = getResults(table, startRow, 
endRow, filterExpression, timerangeMin,
+timerangeMax, limitRows, isReversed, columns)) {
+
+int cnt = 0;
+final int lim = limitRows != null ? limitRows : 0;
+for (final Result result : scanner) {
+
+if (lim > 0 && cnt++ > lim) break;
+
+final byte[] rowKey = result.getRow();
+final Cell[] cells = result.rawCells();
+
+if (cells == null) {
+continue;
+}
+
+// convert HBase cells to NiFi cells
+final ResultCell[] resultCells = new 
ResultCell[cells.length];
+for (int i = 0; i < cells.length; i++) {
+final Cell cell = cells[i];
+final ResultCell resultCell = getResultCell(cell);
+resultCells[i] = resultCell;
+}
+
+// delegate to the handler
+handler.handle(rowKey, resultCells);
+}
+}
+
+}
+
+//
+protected ResultScanner getResults(final Table table, final String 
startRow, final String endRow, final String filterExpression, final Long 
timerangeMin, final Long timerangeMax,
+final Integer limitRows, final Boolean isReversed, final 
Collection columns)  throws IOException {
+final Scan scan = new Scan();
+if (!StringUtils.isBlank(startRow)) 
scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
+if (!StringUtils.isBlank(endRow))   scan.setStopRow(   
endRow.getBytes(StandardCharsets.UTF_8));
+
+
+Filter filter = null;
+if (columns != null) {
+for (Column col : columns) {
+if (col.getQualifier() == null) {
+scan.addFamily(col.getFamily());
+} else {
+scan.addColumn(col.getFamily(), col.getQualifier());
+}
+}
+}else if (!StringUtils.isBlank(filterExpression)) {
+ParseFilter parseFilter = new ParseFilter();
+filter = parseFilter.parseFilterString(filterExpression);
+}
+if (filter != null) scan.setFilter(filter);
+
+if (timerangeMin != null && timerangeMax != null) 
scan.setTimeRange(timerangeMin, timerangeMax);
+
+// ->>> reserved for HBase v 2 or later
+//if (limitRows != null && limitRows > 0){
+//scan.setLimit(limitRows)
+//}
+
+if (isReversed != null) scan.setReversed(isReversed);
--- End diff --

Same as above.


---


[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-21 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r169622591
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
 ---
@@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] 
startRow, final byte[] end
 }
 }
 
+@Override
+public void scan(final String tableName, final String startRow, final 
String endRow, String filterExpression,
+final Long timerangeMin, final Long timerangeMax, final 
Integer limitRows, final Boolean isReversed,
+final Collection columns, final ResultHandler handler) 
throws IOException {
+
+try (final Table table = 
connection.getTable(TableName.valueOf(tableName));
+final ResultScanner scanner = getResults(table, startRow, 
endRow, filterExpression, timerangeMin,
+timerangeMax, limitRows, isReversed, columns)) {
+
+int cnt = 0;
+final int lim = limitRows != null ? limitRows : 0;
+for (final Result result : scanner) {
+
+if (lim > 0 && cnt++ > lim) break;
+
+final byte[] rowKey = result.getRow();
+final Cell[] cells = result.rawCells();
+
+if (cells == null) {
+continue;
+}
+
+// convert HBase cells to NiFi cells
+final ResultCell[] resultCells = new 
ResultCell[cells.length];
+for (int i = 0; i < cells.length; i++) {
+final Cell cell = cells[i];
+final ResultCell resultCell = getResultCell(cell);
+resultCells[i] = resultCell;
+}
+
+// delegate to the handler
+handler.handle(rowKey, resultCells);
+}
+}
+
+}
+
+//
+protected ResultScanner getResults(final Table table, final String 
startRow, final String endRow, final String filterExpression, final Long 
timerangeMin, final Long timerangeMax,
+final Integer limitRows, final Boolean isReversed, final 
Collection columns)  throws IOException {
+final Scan scan = new Scan();
+if (!StringUtils.isBlank(startRow)) 
scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
+if (!StringUtils.isBlank(endRow))   scan.setStopRow(   
endRow.getBytes(StandardCharsets.UTF_8));
--- End diff --

See above.


---


[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-21 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r169622554
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
 ---
@@ -430,6 +430,78 @@ public void scan(final String tableName, final byte[] 
startRow, final byte[] end
 }
 }
 
+@Override
+public void scan(final String tableName, final String startRow, final 
String endRow, String filterExpression,
+final Long timerangeMin, final Long timerangeMax, final 
Integer limitRows, final Boolean isReversed,
+final Collection columns, final ResultHandler handler) 
throws IOException {
+
+try (final Table table = 
connection.getTable(TableName.valueOf(tableName));
+final ResultScanner scanner = getResults(table, startRow, 
endRow, filterExpression, timerangeMin,
+timerangeMax, limitRows, isReversed, columns)) {
+
+int cnt = 0;
+final int lim = limitRows != null ? limitRows : 0;
+for (final Result result : scanner) {
+
+if (lim > 0 && cnt++ > lim) break;
+
+final byte[] rowKey = result.getRow();
+final Cell[] cells = result.rawCells();
+
+if (cells == null) {
+continue;
+}
+
+// convert HBase cells to NiFi cells
+final ResultCell[] resultCells = new 
ResultCell[cells.length];
+for (int i = 0; i < cells.length; i++) {
+final Cell cell = cells[i];
+final ResultCell resultCell = getResultCell(cell);
+resultCells[i] = resultCell;
+}
+
+// delegate to the handler
+handler.handle(rowKey, resultCells);
+}
+}
+
+}
+
+//
+protected ResultScanner getResults(final Table table, final String 
startRow, final String endRow, final String filterExpression, final Long 
timerangeMin, final Long timerangeMax,
+final Integer limitRows, final Boolean isReversed, final 
Collection columns)  throws IOException {
+final Scan scan = new Scan();
+if (!StringUtils.isBlank(startRow)) 
scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
--- End diff --

This should be on separate lines like this:
```
if (!StringUtils.isBlank(startRow)) { 
scan.setStartRow(startRow.getBytes(StandardCharsets.UTF_8));
}
```


---


[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-21 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r169622126
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java
 ---
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestScanHBase {
+
+private ScanHBase proc;
+private MockHBaseClientService hBaseClientService;
+private TestRunner runner;
+
+@Before
+public void setup() throws InitializationException {
+proc = new ScanHBase();
+runner = TestRunners.newTestRunner(proc);
+
+hBaseClientService = new MockHBaseClientService();
+runner.addControllerService("hbaseClient", hBaseClientService);
+runner.enableControllerService(hBaseClientService);
+runner.setProperty(ScanHBase.HBASE_CLIENT_SERVICE, "hbaseClient");
+}
+
+@Test
+public void testColumnsValidation() {
+runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+runner.setProperty(ScanHBase.START_ROW, "row1");
+runner.setProperty(ScanHBase.END_ROW, "row1");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1,cf2:cq1,cf3");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1 cf2,cf3");
+runner.assertNotValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1:,cf2,cf3");
+runner.assertNotValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,");
+runner.assertNotValid();
+}
+
+@Test
+public void testNoIncomingFlowFile() {
+runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+runner.setProperty(ScanHBase.START_ROW, "row1");
+runner.setProperty(ScanHBase.END_ROW, "row1");
+
+runner.run();
+runner.assertTransferCount(ScanHBase.REL_FAILURE, 0);
+runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0);
+runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0);
+
+Assert.assertEquals(0, hBaseClientService.getNumScans());
+}
+
+@Test
+public void testInvalidTableName() {
+runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}");
+runner.setProperty(ScanHBase.START_ROW, "row1");
+runner.setProperty(ScanHBase.END_ROW, "row1");
+
+runner.enqueue("trigger flow file");
+runner.run();
+
+runner.assertTransferCount(ScanHBase.REL_FAILURE, 1);
+runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0);
+runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0);
+
+Assert.assertEquals(0, hBaseClientService.getNumScans());
+}
+
+@Test
+public void testResultsNotFound() {
+runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+runner.setProperty(ScanHBase.START_ROW, "row1");
+runner.setProperty(ScanHBase.END_ROW, "row1");
+
+runner.enqueue("trigger flow file");
+runner.run();
+
+runner.assertTransferCount(ScanHBase.REL_FAILURE, 0);
+

[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-21 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r169621787
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestScanHBase.java
 ---
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestScanHBase {
+
+private ScanHBase proc;
+private MockHBaseClientService hBaseClientService;
+private TestRunner runner;
+
+@Before
+public void setup() throws InitializationException {
+proc = new ScanHBase();
+runner = TestRunners.newTestRunner(proc);
+
+hBaseClientService = new MockHBaseClientService();
+runner.addControllerService("hbaseClient", hBaseClientService);
+runner.enableControllerService(hBaseClientService);
+runner.setProperty(ScanHBase.HBASE_CLIENT_SERVICE, "hbaseClient");
+}
+
+@Test
+public void testColumnsValidation() {
+runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+runner.setProperty(ScanHBase.START_ROW, "row1");
+runner.setProperty(ScanHBase.END_ROW, "row1");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,cf2:cq2,cf3:cq3");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1,cf2:cq1,cf3");
+runner.assertValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1 cf2,cf3");
+runner.assertNotValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1:,cf2,cf3");
+runner.assertNotValid();
+
+runner.setProperty(ScanHBase.COLUMNS, "cf1:cq1,");
+runner.assertNotValid();
+}
+
+@Test
+public void testNoIncomingFlowFile() {
+runner.setProperty(ScanHBase.TABLE_NAME, "table1");
+runner.setProperty(ScanHBase.START_ROW, "row1");
+runner.setProperty(ScanHBase.END_ROW, "row1");
+
+runner.run();
+runner.assertTransferCount(ScanHBase.REL_FAILURE, 0);
+runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0);
+runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0);
+
+Assert.assertEquals(0, hBaseClientService.getNumScans());
+}
+
+@Test
+public void testInvalidTableName() {
+runner.setProperty(ScanHBase.TABLE_NAME, "${hbase.table}");
--- End diff --

Without calling `setVariable` and `setValidateExpressionLanguage` (think 
that's its name) I'm not sure if this is going to do more than test the raw 
value in that field. Granted, that is an invalid name.


---


[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-19 Thread bdesert
Github user bdesert commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r169176478
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
 ---
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonFullRowSerializer;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hbase", "scan", "fetch", "get"})
+@CapabilityDescription("Scans and fetches rows from an HBase table. This 
processor may be used to fetch rows from hbase table by specifying a range of 
rowkey values (start and/or end ),"
++ "by time range, by filter expression, or any combination of 
them. \n"
++ "Order of records can be controlled by a property 
Reversed"
++ "Number of rows retrieved by the processor can be limited.")
+@WritesAttributes({
+@WritesAttribute(attribute = "hbase.table", description = "The 
name of the HBase table that the row was fetched from"),
+@WritesAttribute(attribute = "hbase.resultset", description = "A 
JSON document/s representing the row/s. This property is only written when a 
Destination of flowfile-attributes is selected."),
+@WritesAttribute(attribute = "mime.type", description = "Set to 
application/json when using a Destination of flowfile-content, not set or 
modified otherwise"),
+@WritesAttribute(attribute = "hbase.rows.count", description = 
"Number of rows in the content of given flow file"),
+@WritesAttribute(attribute = "scanhbase.results.found", 
description = "Indicates whether at least one row has been found in given hbase 
table with provided conditions. "
++ "Could be null (not present) if transfered to FAILURE")
+})
+public class ScanHBase extends AbstractProcessor {
+//enhanced regex for columns to allow "-" in column qualifier names
+static final Pattern COLUMNS_PATTERN = 
Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
+static 

[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-19 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2478#discussion_r169072654
  
--- Diff: 
nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/ScanHBase.java
 ---
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hbase;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.hbase.io.JsonFullRowSerializer;
+import org.apache.nifi.hbase.io.JsonQualifierAndValueRowSerializer;
+import org.apache.nifi.hbase.io.RowSerializer;
+import org.apache.nifi.hbase.scan.Column;
+import org.apache.nifi.hbase.scan.ResultCell;
+import org.apache.nifi.hbase.scan.ResultHandler;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.Tuple;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"hbase", "scan", "fetch", "get"})
+@CapabilityDescription("Scans and fetches rows from an HBase table. This 
processor may be used to fetch rows from hbase table by specifying a range of 
rowkey values (start and/or end ),"
++ "by time range, by filter expression, or any combination of 
them. \n"
++ "Order of records can be controlled by a property 
Reversed"
++ "Number of rows retrieved by the processor can be limited.")
+@WritesAttributes({
+@WritesAttribute(attribute = "hbase.table", description = "The 
name of the HBase table that the row was fetched from"),
+@WritesAttribute(attribute = "hbase.resultset", description = "A 
JSON document/s representing the row/s. This property is only written when a 
Destination of flowfile-attributes is selected."),
+@WritesAttribute(attribute = "mime.type", description = "Set to 
application/json when using a Destination of flowfile-content, not set or 
modified otherwise"),
+@WritesAttribute(attribute = "hbase.rows.count", description = 
"Number of rows in the content of given flow file"),
+@WritesAttribute(attribute = "scanhbase.results.found", 
description = "Indicates whether at least one row has been found in given hbase 
table with provided conditions. "
++ "Could be null (not present) if transfered to FAILURE")
+})
+public class ScanHBase extends AbstractProcessor {
+//enhanced regex for columns to allow "-" in column qualifier names
+static final Pattern COLUMNS_PATTERN = 
Pattern.compile("\\w+(:(\\w|-)+)?(?:,\\w+(:(\\w|-)+)?)*");
+

[GitHub] nifi pull request #2478: NIFI-4833 Add scanHBase Processor

2018-02-17 Thread bdesert
GitHub user bdesert opened a pull request:

https://github.com/apache/nifi/pull/2478

NIFI-4833 Add scanHBase Processor

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [x] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [x] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [x] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [x] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [x] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [x] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bdesert/nifi NIFI-4833-Add-ScanHBase-processor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2478.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2478


commit 39bd6fb5d02eb7dca63830967823a4bb48c5712c
Author: Ed 
Date:   2018-02-17T21:26:04Z

Add ScanHBase Processor

New processor for scanning HBase records based on verious params like range 
of rowkeys, range of timestamps. Supports result limit and reverse scan.

commit d2f5410be14a77f64e7ca5593e6c908620a8da58
Author: Ed 
Date:   2018-02-17T21:27:18Z

Adds Atlas Support for ScanHBase processor

Adds Atlas Support for ScanHBase processor




---