session.write() doesn’t take just an InputStream, it either takes both an InputStream and OutputStream (if using a StreamCallback like you are) or just an OutputStream (using an OutputStreamCallback, usually for source processors that don’t have FlowFile input)

Sent from my iPhone

On Feb 9, 2023, at 9:34 PM, James McMahon <jsmcmah...@gmail.com> wrote:


Mark, your RouteText blog worked perfectly. Thank you very much.

Matt, I still want to get the BufferedReader working. I'm close. Here is my code, with the error that results. I do not know what this error means. Any thoughts?

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
def ff=session.get()
if(!ff)return
try {
    // Here we are reading from the current flowfile content and writing to the new content
    //ff = session.write(ff, { inputStream, outputStream ->
    ff = session.write(ff, { inputStream ->
        def bufferedReader = new BufferedReader(new InputStreamReader(inputStream))

        // Header is the first line...
        def header = bufferedReader.readLine()
        ff = session.putAttribute(ff, 'mdb.table.header', header)
        

        //def bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream))
        //def line

        int i = 0

        // While the incoming line is not empty, write it to the outputStream
        //while ((line = bufferedReader.readLine()) != null) {
        //    bufferedWriter.write(line)
        //    bufferedWriter.newLine()
        //    i++
        //}

        // By default, INFO doesn't show in the logs and WARN will appear in the processor bulletins
        //log.warn("Wrote ${i} lines to output")

        bufferedReader.close()
        //bufferedWriter.close()
    } as StreamCallback)

    session.transfer(ff, REL_SUCCESS)
} catch (Exception e) {
     log.error('Error occurred extracting header for table in mdb file', e)
     session.transfer(ff, REL_FAILURE)
}

The ExecuteScript processor throws this error:
02:30:38 UTC
ERROR
ExecuteScript[id=4d6c3e21-a72e-16b2-6a7f-f5cadb351c0e] Error occurred extracting header for table in mdb file: groovy.lang.MissingMethodException: No signature of method: Script10$_run_closure1.doCall() is applicable for argument types: (org.apache.nifi.controller.repository.io.TaskTerminationInputStream...) values: [org.apache.nifi.controller.repository.io.TaskTerminationInputStream@5a2d22a7, ...]
Possible solutions: doCall(java.lang.Object), findAll(), findAll()


On Thu, Feb 9, 2023 at 8:35 PM Mark Payne <marka...@hotmail.com> wrote:



James,

Have a look at the RouteText processor. I wrote a blog post recently on using it: https://medium.com/cloudera-inc/building-an-effective-nifi-flow-routetext-5068a3b4efb3

Thanks
Mark

Sent from my iPhone

On Feb 9, 2023, at 8:06 PM, James McMahon <jsmcmah...@gmail.com> wrote:


My version of nifi does not have Range Sampling unfortunately.
If I get the flowfile through a session as done in the Cookbook, does anyone know of an approach in Groovy to grab line N and avoid loading the entire CSV file into string variable text?

On Thu, Feb 9, 2023 at 7:18 PM Matt Burgess <mattyb...@gmail.com> wrote:
I’m AFK ATM but Range Sampling was added into the SampleRecord processor (https://issues.apache.org/jira/browse/NIFI-9814), the Jira doesn’t say which version it went into but it is definitely in 1.19.1+. If that’s available to you then you can just specify “2” as the range and it will only return that line.

For total record count without loading the whole thing into memory, there’s probably a more efficient way but you could use ConvertRecord and convert it from CSV to CSV and it should write out the “record.count” attribute. I think some/most/all record processors write this attribute, and they work record by record so they don’t load the whole thing into memory. Even SampleRecord adds a record.count attribute but if you specify one line the value will be 1 :)

Regards,
Matt


On Feb 9, 2023, at 6:57 PM, James McMahon <jsmcmah...@gmail.com> wrote:


Hello. I am trying to identify a header line and a data line count from a flowfile that is in csv format.

Most of us are familiar with Matt B's outstanding Cookbook series, and I am trying to use that as my starting point. Here is my Groovy code:

import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
def ff=session.get()
if(!ff)return
try {
     def text = ''
     // Cast a closure with an inputStream parameter to InputStreamCallback
     session.read(ff, {inputStream ->
          text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
          // Do something with text here
          // get header from the second line of the flowfile
          // set datacount as the total line count of the file - 2 
          ...
          ff = session.putAttribute(ff, 'mdb.table.header', header)
          ff = session.putAttribute(ff, 'mdb.table.datarecords', datacount)
     } as InputStreamCallback)
     session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
     log.error('Error occurred identifying tables in mdb file', e)
     session.transfer(ff, REL_FAILURE)
}

I want to avoid using that line in red, because as Matt cautions in his cookbook, our csv files are too large. I do not want to read in the entire file to variable text. It's going to be a problem.

How in Groovy can I cherry pick only the line I want from the stream (line #2 in this case)?

Also, how can I get a count of the total lines without loading them all into text?

Thanks in advance for your help.

Reply via email to