Wanted to thank Matt and Mark for their assistance. Also wanted to follow
up with a final reply detailing how I got this to work in case it may
benefit anyone else down the road.

*The configuration of the RouteText processor that worked to extract a
specific line:*
Routing Strategy                               Route to 'matched' if line
matches all conditions
Matching Strategy                            Satisfies Expression
Character Set                                   UTF-8
Ignore Leading/Trailing Whitespace true
Ignore Case                                      false
Grouping Regular Expression          No value set
isHeader                                           ${lineNo:equals(1)}

One observation about the RouteText solution: it appears to replace the
content of the flowfile with the match. So if you need to preserve the
incoming content of your flowfile for other purposes or if you want the
match to be saved to an attribute, this may not be for you. I was unable to
find a way to get RouteText to direct the match to a flowfile attribute
(using NiFi version 1.16.3. "Why a previous version, Jim, rather than 1.19
or the newly released 1.20?" Because I integrated some dependencies that
have only been tested through v1.16 - a consideration I'll have to tackle
later.)

*The groovy script I got to work that employs a BufferedReader to do the
same, saving to a new attribute:*

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
def header
def ff=session.get()
if(!ff) return
try {
    // Here we are reading from the current flowfile content and writing to
the new content
    ff = session.write(ff, { inputStream, outputStream ->
        def bufferedReader = new BufferedReader(new
InputStreamReader(inputStream))
        def bufferedWriter = new BufferedWriter(new
OutputStreamWriter(outputStream))

        // Header is the first line...
        header = bufferedReader.readLine() + "\n"

        bufferedWriter.write(header)

        def line

        int i = 1

        // While the incoming line is not empty, write it to the
outputStream
        while ((line = bufferedReader.readLine()) != null) {
            bufferedWriter.write(line)
            bufferedWriter.newLine()
            i++
        }

        // By default, INFO doesn't show in the logs and WARN will appear
in the processor bulletins
        log.warn("Wrote ${i} lines to output")

        bufferedReader.close()
        bufferedWriter.close()
    } as StreamCallback)
    ff = session.putAttribute(ff, 'mdb.table.header', header)
    session.transfer(ff, REL_SUCCESS)
} catch (Exception e) {
     log.error('Error occurred extracting header for table in mdb file', e)
     session.transfer(ff, REL_FAILURE)
}

Jim

On Fri, Feb 10, 2023 at 7:19 AM James McMahon <jsmcmah...@gmail.com> wrote:

> Ah - of course. I went overboard here. Just because I don't use the
> OutputStream for this purpose doesn't mean I can assume the method
> signature for the session.write() doesn't still require it. I'll fix this
> later tonight. Thank you both very much, Matt and Mark.
> Jim
>
> On Thu, Feb 9, 2023 at 10:44 PM Matt Burgess <mattyb...@gmail.com> wrote:
>
>> session.write() doesn’t take just an InputStream, it either takes both an
>> InputStream and OutputStream (if using a StreamCallback like you are) or
>> just an OutputStream (using an OutputStreamCallback, usually for source
>> processors that don’t have FlowFile input)
>>
>> Sent from my iPhone
>>
>> On Feb 9, 2023, at 9:34 PM, James McMahon <jsmcmah...@gmail.com> wrote:
>>
>> 
>> Mark, your RouteText blog worked perfectly. Thank you very much.
>>
>> Matt, I still want to get the BufferedReader working. I'm close. Here is
>> my code, with the error that results. I do not know what this error means.
>> Any thoughts?
>>
>> import org.apache.commons.io.IOUtils
>> import java.nio.charset.StandardCharsets
>> def ff=session.get()
>> if(!ff)return
>> try {
>>     // Here we are reading from the current flowfile content and writing
>> to the new content
>>     //ff = session.write(ff, { inputStream, outputStream ->
>>     ff = session.write(ff, { inputStream ->
>>         def bufferedReader = new BufferedReader(new
>> InputStreamReader(inputStream))
>>
>>         // Header is the first line...
>>         def header = bufferedReader.readLine()
>>         ff = session.putAttribute(ff, 'mdb.table.header', header)
>>
>>
>>         //def bufferedWriter = new BufferedWriter(new
>> OutputStreamWriter(outputStream))
>>         //def line
>>
>>         int i = 0
>>
>>         // While the incoming line is not empty, write it to the
>> outputStream
>>         //while ((line = bufferedReader.readLine()) != null) {
>>         //    bufferedWriter.write(line)
>>         //    bufferedWriter.newLine()
>>         //    i++
>>         //}
>>
>>         // By default, INFO doesn't show in the logs and WARN will
>> appear in the processor bulletins
>>         //log.warn("Wrote ${i} lines to output")
>>
>>         bufferedReader.close()
>>         //bufferedWriter.close()
>>     } as StreamCallback)
>>
>>     session.transfer(ff, REL_SUCCESS)
>> } catch (Exception e) {
>>      log.error('Error occurred extracting header for table in mdb file',
>> e)
>>      session.transfer(ff, REL_FAILURE)
>> }
>>
>> The ExecuteScript processor throws this error:
>> 02:30:38 UTC
>> ERROR
>> 4d6c3e21-a72e-16b2-6a7f-f5cadb351c0e
>>
>> ExecuteScript[id=4d6c3e21-a72e-16b2-6a7f-f5cadb351c0e] Error occurred 
>> extracting header for table in mdb file: groovy.lang.MissingMethodException: 
>> No signature of method: Script10$_run_closure1.doCall() is applicable for 
>> argument types: 
>> (org.apache.nifi.controller.repository.io.TaskTerminationInputStream...) 
>> values: 
>> [org.apache.nifi.controller.repository.io.TaskTerminationInputStream@5a2d22a7,
>>  ...]
>> Possible solutions: doCall(java.lang.Object), findAll(), findAll()
>>
>>
>>
>> On Thu, Feb 9, 2023 at 8:35 PM Mark Payne <marka...@hotmail.com> wrote:
>>
>>> James,
>>>
>>> Have a look at the RouteText processor. I wrote a blog post recently on
>>> using it:
>>> https://medium.com/cloudera-inc/building-an-effective-nifi-flow-routetext-5068a3b4efb3
>>>
>>> Thanks
>>> Mark
>>>
>>> Sent from my iPhone
>>>
>>> On Feb 9, 2023, at 8:06 PM, James McMahon <jsmcmah...@gmail.com> wrote:
>>>
>>> 
>>> My version of nifi does not have Range Sampling unfortunately.
>>> If I get the flowfile through a session as done in the Cookbook, does
>>> anyone know of an approach in Groovy to grab line N and avoid loading the
>>> entire CSV file into string variable *text*?
>>>
>>> On Thu, Feb 9, 2023 at 7:18 PM Matt Burgess <mattyb...@gmail.com> wrote:
>>>
>>>> I’m AFK ATM but Range Sampling was added into the SampleRecord
>>>> processor (https://issues.apache.org/jira/browse/NIFI-9814), the Jira
>>>> doesn’t say which version it went into but it is definitely in 1.19.1+. If
>>>> that’s available to you then you can just specify “2” as the range and it
>>>> will only return that line.
>>>>
>>>> For total record count without loading the whole thing into memory,
>>>> there’s probably a more efficient way but you could use ConvertRecord and
>>>> convert it from CSV to CSV and it should write out the “record.count”
>>>> attribute. I think some/most/all record processors write this attribute,
>>>> and they work record by record so they don’t load the whole thing into
>>>> memory. Even SampleRecord adds a record.count attribute but if you specify
>>>> one line the value will be 1 :)
>>>>
>>>> Regards,
>>>> Matt
>>>>
>>>>
>>>> On Feb 9, 2023, at 6:57 PM, James McMahon <jsmcmah...@gmail.com> wrote:
>>>>
>>>> 
>>>> Hello. I am trying to identify a header line and a data line count from
>>>> a flowfile that is in csv format.
>>>>
>>>> Most of us are familiar with Matt B's outstanding Cookbook series, and
>>>> I am trying to use that as my starting point. Here is my Groovy code:
>>>>
>>>> import org.apache.commons.io.IOUtils
>>>> import java.nio.charset.StandardCharsets
>>>> def ff=session.get()
>>>> if(!ff)return
>>>> try {
>>>>      def text = ''
>>>>      // Cast a closure with an inputStream parameter to
>>>> InputStreamCallback
>>>>      session.read(ff, {inputStream ->
>>>>           text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
>>>>           // Do something with text here
>>>>           // get header from the second line of the flowfile
>>>>           // set datacount as the total line count of the file - 2
>>>>           ...
>>>>           ff = session.putAttribute(ff, 'mdb.table.header', header)
>>>>           ff = session.putAttribute(ff, 'mdb.table.datarecords',
>>>> datacount)
>>>>      } as InputStreamCallback)
>>>>      session.transfer(flowFile, REL_SUCCESS)
>>>> } catch(e) {
>>>>      log.error('Error occurred identifying tables in mdb file', e)
>>>>      session.transfer(ff, REL_FAILURE)
>>>> }
>>>>
>>>> I want to avoid using that line in red, because as Matt cautions in his
>>>> cookbook, our csv files are too large. I do not want to read in the entire
>>>> file to variable text. It's going to be a problem.
>>>>
>>>> How in Groovy can I cherry pick only the line I want from the stream
>>>> (line #2 in this case)?
>>>>
>>>> Also, how can I get a count of the total lines without loading them all
>>>> into text?
>>>>
>>>> Thanks in advance for your help.
>>>>
>>>>

Reply via email to