[GitHub] nifi issue #2478: NIFI-4833 Add scanHBase Processor

2018-03-13 Thread bbende
Github user bbende commented on the issue:

https://github.com/apache/nifi/pull/2478
  
Latest update looks good, going to merge


---


[GitHub] nifi issue #2478: NIFI-4833 Add scanHBase Processor

2018-03-09 Thread bbende
Github user bbende commented on the issue:

https://github.com/apache/nifi/pull/2478
  
@bdesert thanks for the updates, there is one error case I think we need to 
handle and then we should be good to go...

The case is if an exception happens half-way through handling the results 
and gets caught in the try-catch on lines 385-389, we currently only transfer 
the original flow file to failure and return, but the handler may have a flow 
file it created and was writing results to, and this would need to be removed 
from the session.

The reason this isn't caught in the JUnit tests is because currently 
MockHBaseClientService lets you set throwException which throws an exception 
right at the beginning of scan before the handler has ever been called. If you 
want to create a way to test this you could introduce a new boolean like 
throwExceptionAfterNumResults and also take in an integer number of results. 

For quick testing I hacked a change into the MockHBaseClientService so that 
it throws an exception after the first result:

 ```
@Override
public void scan(String tableName, String startRow, String endRow, 
String filterExpression, Long timerangeMin,
Long timerangeMax, Integer limitRows, Boolean isReversed, 
Collection columns, ResultHandler handler)
throws IOException {
//if (throwException) {
//throw new IOException("exception");
//}

// pass all the staged data to the handler
int resultCount = 0;
for (final Map.Entry entry : 
results.entrySet()) {
handler.handle(entry.getKey().getBytes(StandardCharsets.UTF_8), 
entry.getValue());
resultCount++;

if (resultCount > 0) {
throw new IOException("exception");
}
}

// delegate to the handler

numScans++;
}
```
Then updated the test case:
```
@Test
public void testScanWhenScanThrowsException() {
//hBaseClientService.setThrowException(true);

final Map cells = new HashMap<>();
cells.put("cq1", "val1");
cells.put("cq2", "val2");

final long ts1 = 123456789;
hBaseClientService.addResult("row1", cells, ts1);
hBaseClientService.addResult("row2", cells, ts1);

runner.setProperty(ScanHBase.TABLE_NAME, "table1");
runner.setProperty(ScanHBase.START_ROW, "row1");
runner.setProperty(ScanHBase.END_ROW, "row1");

runner.enqueue("trigger flow file");
runner.run();

runner.assertTransferCount(ScanHBase.REL_FAILURE, 1);
runner.assertTransferCount(ScanHBase.REL_SUCCESS, 0);
runner.assertTransferCount(ScanHBase.REL_ORIGINAL, 0);

Assert.assertEquals(0, hBaseClientService.getNumScans());
}
```


---


[GitHub] nifi issue #2478: NIFI-4833 Add scanHBase Processor

2018-03-07 Thread bdesert
Github user bdesert commented on the issue:

https://github.com/apache/nifi/pull/2478
  
@bbende ,
Bryan, committed the changes. Tested on a cluster, works as expected.
When have a time, please review.


---


[GitHub] nifi issue #2478: NIFI-4833 Add scanHBase Processor

2018-03-05 Thread bdesert
Github user bdesert commented on the issue:

https://github.com/apache/nifi/pull/2478
  
@bbende , thank you!
Both comments make sense. Will commit these changes soon.


---


[GitHub] nifi issue #2478: NIFI-4833 Add scanHBase Processor

2018-03-05 Thread bbende
Github user bbende commented on the issue:

https://github.com/apache/nifi/pull/2478
  
One other question, what do you envision people most likely do with the 
output of this processor?

The reason I'm asking is because I'm debating if it makes sense to write 
multiple JSON documents to a single flow file without wrapping them in an 
array. GetHBase and FetchHBase didn't have this problem because they wrote a 
row per flow file (which probably wasn't a good idea for GetHBase).

As an example scenario, say we have a bunch of rows coming out of this 
processor using the col-qual-val format like:
```
{"id":"", "message":"The time is Mon Mar 05 10:20:07 EST 2018"}
{"id":"", "message":"The time is Mon Mar 05 10:21:03 EST 2018"}
{"id":"", "message":"The time is Mon Mar 05 10:22:44 EST 2018"}
{"id":"", "message":"The time is Mon Mar 05 10:22:44 EST 2018"}
{"id":"", "message":"The time is Mon Mar 05 10:22:44 EST 2018"}
{"id":"", "message":"The time is Mon Mar 05 10:22:44 EST 2018"}
{"id":"", "message":"The time is Mon Mar 05 10:22:44 EST 2018"}
{"id":"", "message":"The time is Mon Mar 05 10:22:44 EST 2018"}
{"id":"", "message":"The time is Mon Mar 05 10:22:44 EST 2018"}
```

If we then created a schema for this:
```
{
  "name": "scan",
  "namespace": "nifi",
  "type": "record",
  "fields": [
{ "name": "id", "type": "string" },
{ "name": "message", "type": "string" }
  ]
}
```
Then tried to use ConvertRecord with a JsonTreeReader and 
CsvRecordSetWriter, to convert from JSON to CSV, we get:
```
id,message
"",The time is Mon Mar 05 10:20:07 EST 2018
```
It only ends up converting the first JSON document because the 
JsonTreeReader doesn't know how to read multiple records unless its a JSON 
array.

There may be cases where the current output makes sense so I'm not saying 
to change it yet, but just trying to think of what the most common scenario 
will be.


---


[GitHub] nifi issue #2478: NIFI-4833 Add scanHBase Processor

2018-03-05 Thread bbende
Github user bbende commented on the issue:

https://github.com/apache/nifi/pull/2478
  
@bdesert Thanks for the updates, was reviewing the code again and I think 
we need to change to way the `ScanHBaseResultHandler` works...

Currently it adds rows to a list in memory until bulk size is reached, and 
since bulk size defaults to 0, the default case will be that bulk size is never 
reached and all the rows are left as "hanging" rows. This means if someone 
scans a table with 1 million rows, all 1 millions will be in memory before 
being written to the flow file which would not be good for memory usage.

We should be able to write row by row to the flow file and never add them 
to a list. Inside the handler we can use `session.append(flowFile, (out) ->` to 
append a row at a time to the flow file. I think we can then do away with the 
"hanging rows" concept because there won't be anything buffered in memory.


---


[GitHub] nifi issue #2478: NIFI-4833 Add scanHBase Processor

2018-02-26 Thread bdesert
Github user bdesert commented on the issue:

https://github.com/apache/nifi/pull/2478
  
@bbende , I addressed all the comments. Thank you and let me know if you 
see more issues or have some recommendations/suggestions.


---


[GitHub] nifi issue #2478: NIFI-4833 Add scanHBase Processor

2018-02-21 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2478
  
@bbende @ijokarumawak +1 LGTM but it needs a second look. I tested it with 
HBase locally and it was able to work just fine with what I threw at it.


---


[GitHub] nifi issue #2478: NIFI-4833 Add scanHBase Processor

2018-02-17 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2478
  
I'll try to take a look this weekend.


---


[GitHub] nifi issue #2478: NIFI-4833 Add scanHBase Processor

2018-02-17 Thread bdesert
Github user bdesert commented on the issue:

https://github.com/apache/nifi/pull/2478
  
@MikeThomsen , are you available to re-review this PR? I have addressed you 
comment regarding branch and the rest (except for labels, which can be added 
later in bulk for all the HBase related processors)


---