Re: How to split json subarrays and keep root

2020-09-28 Thread Matt Burgess
Jens,

Try ForkRecord [1] with "Mode" set to "Extract" and "Include Parent
Fields" set to "true", I think that does what you're looking to do.

Regards,
Matt

[1] 
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.12.1/org.apache.nifi.processors.standard.ForkRecord/index.html

On Fri, Sep 25, 2020 at 1:48 AM Jens M. Kofoed  wrote:
>
> Hi
>
> I have a JSON document with an array which I would like to split and flatten.
> In my example below key4 is an array containing 2 documents. I need to split 
> the record based on each document in the key4 array, so I end up with 
> multiple records. Where each new record has a copy of all keys except key4 
> which should be flatten into each own document.
> {
> key1:value1,
> key2:value2,
> key3: {
>   key3a:value3a,
>   key3b:value3b
> }
> key4: [
>{
>   key4a1:value4a1,
>   key4a2:value4a2
>   },
>   {
>   key4b1:value4b1,
>   key4b2:value4b2
>   }
> ]
> }
>
> Should be like this:
> Record 1
> {
> key1:value1,
> key2:value2,
> key3: {
>   key3a:value3a,
>   key3b:value3b
> }
> key4:{
>key4a1:value4a1,
>key4a2:value4a2
>   }
> }
>
> Record 2
> {
> key1:value1,
> key2:value2,
> key3: {
>   key3a:value3a,
>   key3b:value3b
> }
> key4:{
>key4b1:value4b1,
>key4b2:value4b2
>   }
> }
>
> Kind regards
> Jens M. Kofoed


Re: NiFi V1.9.2 Performance

2020-09-24 Thread Matt Burgess
Nathan,

If you have multiple JSON messages in one flow file, is it in one large
array, or a top-level JSON object with an array inside? Also are you trying
to transform each message or the whole thing (i.e. do you need to know
about more than one message at a time)? If you have a top-level array and
are transforming each element in the array, you might get better
performance out of JoltTransformRecord rather than JoltTransformJSON, as
the latter reads the entire file into memory as a single JSON entity. If
you have a top-level object then they will both read the whole thing in.

Regards,
Matt


On Thu, Sep 24, 2020 at 10:25 AM  wrote:

> Hi Mark,
>
>
>
> From what I can see (based on queues building before the processor and
> basically to empty after) it is the Jolt processor we have problems with.
> We’ve have tried to add more concurrency, reduce the run schedule and
> increasing the duration, but it didn’t seem to resolve the high CPU load
> (~32 when processing at the rates described in my first email, when no
> traffic is processing it sits at 0.2).
>
>
>
> It could be the completely wrong way of diagnosing this! I’ve struggled to
> find information (Apart from your great videos) to assist in getting to the
> bottom of it.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Mark Payne [mailto:marka...@hotmail.com]
> *Sent:* 24 September 2020 15:12
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Hey Nathan,
>
>
>
> A quick clarification - is the bottleneck / the slow point in your flow
> actually consuming from Kafka or Jolt? From your original message it sounds
> like the bottleneck may actually be the Jolt Transform processor?
>
>
>
> If the problem is in the ConsumeKafka processor, one thing you’ll want to
> look at is in the Settings tab, set the Yield Duration to “0 sec”. That can
> make a huge difference in performance from Kafka processors.
>
>
>
> Thanks
>
> -Mark
>
>
>
>
>
> On Sep 24, 2020, at 10:07 AM, nathan.engl...@bt.com wrote:
>
>
>
> Hi Bryan,
>
>
>
> Thanks for this. My understanding of the concurrent tasks was incorrect. I
> thought it was across the whole cluster, not per node.
>
>
>
> I did spend some time looking at the code for the demarcator as we had
> issues getting it batching. I think there may be a slight misunderstanding
> between my description and how it sounds.
>
>
>
> When I say an Empty string, the message demarcator isn’t blank. I have
> used the checkbox ‘Set Empty String’, which means the processor treats the
> field as Null (From memory). If I left the field empty (checkbox not
> selected), it was one Kafka message to one flow file, which was a massive
> bottleneck.
>
>
>
> I also seem to remember from when I looked at the code. The
> ConsumeKafkaRecord processors defaults the demarcator to null.
>
>
>
> Kind Regards,
>
>
>
> Nathan
>
>
>
> *From:* Bryan Bende [mailto:bbe...@gmail.com ]
> *Sent:* 24 September 2020 14:54
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi V1.9.2 Performance
>
>
>
> Regarding the batching, I would have to double check the code, but since
> you said the demarcator is empty string, I think that means it is not
> batching and putting one message to one flow file. Basically if a
> demarcator is not set then batch size is ignored.
>
>
>
> Regarding the processors/tasks, lets take one topic with 11 partitions as
> an example, if you make a consumer processor for this topic with 1
> concurrent task, then you have 3 instances of this processor since you have
> a 3 node cluster, so you might end up with something like this...
>
>
>
> node 1 - ConsumeKafka
>
>   Task 1 - 4 partitions
>
>
>
> node 2 - ConsumeKafka
>
>   Task 1 - 4 partitions
>
>
>
> node 3 - ConsumeKafka
>
>   Task 1 - 3 partitions
>
>
>
> It may not be exactly like that, but just an example as to how it should
> be assigned.
>
>
>
> To add more parallelism you could then increase concurrent tasks up to
> maybe 4 and you get something like this...
>
>
>
> node 1 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - 1 partition
>
>
>
> node 2 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - 1 partition
>
>
>
> node 3 - ConsumeKafka
>
>   Task 1 - 1 partition
>
>   Task 2 - 1 partition
>
>   Task 3 - 1 partition
>
>   Task 4 - nothing
>
>
>
> If you go higher than 4 concurrent tasks you will just end up creating
> more consumers than partitions, and there is nothing to assign them.
>
>
>
>
>
> On Thu, Sep 24, 2020 at 9:30 AM  wrote:
>
> Hi Bryan,
>
>
>
> We have configured the processor to read in a maximum batch size of 2k
> messages, which does seem to have one than more Kafka message in the flow
> file.
>
>
>
> Completely understand on the Load balancing, we tried several iterations
> of 1 task to one topic partition. However, we still found it to be loaded
> towards one specific node. I will try splitting it into multiple 

Re: Run Nifi in IntelliJ to debug?

2020-10-26 Thread Matt Burgess
Sorry I misread the part where you wanted to run NiFi inside IntelliJ,
I was talking about running it externally (from the command-line,
e.g.) and connecting the IntelliJ debugger. I haven't run NiFi itself
using IntelliJ, maybe someone else can chime in for that.

On Mon, Oct 26, 2020 at 12:03 PM Matt Burgess  wrote:
>
> Yes, that's a pretty common operation amongst NiFi developers. In
> conf/bootstrap.conf there's a section called Enable Remote Debugging
> and a commented-out line something like:
>
> java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
>
> You can remove the comment from that line and set things like the
> address to the desired port, whether to suspend the JVM until a
> debugger connects, etc. Then in IntelliJ you can create a new
> configuration of type Remote, point it at the port you set in the
> above line, and connect the debugger. It will then stop at breakpoints
> and you can do all the debugging stuff like add Watches, execute
> expressions (to change values at runtime), etc.
>
> Regards,
> Matt
>
> On Mon, Oct 26, 2020 at 11:52 AM Darren Govoni  wrote:
> >
> > Hi
> >Is it possible to run Nifi from inside IntelliJ with debugging such that 
> > I can hit the app from my browser and trigger breakpoints?
> >
> > If anyone has done this can you please share any info?
> >
> > Thanks in advance!
> > Darren
> >
> > Sent from my Verizon, Samsung Galaxy smartphone
> > Get Outlook for Android


Re: Run Nifi in IntelliJ to debug?

2020-10-26 Thread Matt Burgess
Yes, that's a pretty common operation amongst NiFi developers. In
conf/bootstrap.conf there's a section called Enable Remote Debugging
and a commented-out line something like:

java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005

You can remove the comment from that line and set things like the
address to the desired port, whether to suspend the JVM until a
debugger connects, etc. Then in IntelliJ you can create a new
configuration of type Remote, point it at the port you set in the
above line, and connect the debugger. It will then stop at breakpoints
and you can do all the debugging stuff like add Watches, execute
expressions (to change values at runtime), etc.

Regards,
Matt

On Mon, Oct 26, 2020 at 11:52 AM Darren Govoni  wrote:
>
> Hi
>Is it possible to run Nifi from inside IntelliJ with debugging such that I 
> can hit the app from my browser and trigger breakpoints?
>
> If anyone has done this can you please share any info?
>
> Thanks in advance!
> Darren
>
> Sent from my Verizon, Samsung Galaxy smartphone
> Get Outlook for Android


Re: Get all available variables in the InvokeScriptedProcessor

2020-08-11 Thread Matt Burgess
Although this is an "unnatural" use of Groovy (and a conversation much
better suited for the dev list :), it is possible to get at a map of
defined variables (key and value). This counts on particular
implementations of the API and that there is no SecurityManager
installed in the JVM so Groovy ignores boundaries like private
classes.  In InvokeScriptedProcessor or ExecuteScript it would look
something like:

def varRegistry = context.procNode.variableRegistry
def varMap = [:] as Map
storeVariables(varMap, varRegistry)

The storeVariables method is just a parent-first recursive call to
fill your map with variables, this allows child registries to override
variables that were declared "above":

def storeVariables(map, registry) {
  if(!registry) return map
  def parent
try {
parent = registry.parent
} catch(t) {
map.putAll(registry.variableMap)
  return map
}
  if(!parent) {
map.putAll(registry.variableMap)
return map
  }
  storeVariables(map, parent)
}

It works because "context" happens to be a StandardProcessContext
instance, which has a private "procNode" member of type ProcessorNode,
which is an extension of AbstractComponentNode which has a
getVariableRegistry() method.

It's definitely a hack so please use at your own risk :)

Regards,
Matt

On Tue, Aug 11, 2020 at 1:18 AM Saloni Udani  wrote:
>
> Thanks Andy, but with expression language I can only get values of  
> attributes and not both key and value. In our case , variable key also has 
> some useful information.
>
> Thanks
>
> On Mon, Aug 10, 2020 at 10:32 PM Andy LoPresto  wrote:
>>
>> Those variables are available to be referenced via Expression Language in 
>> the flowfile attributes. They are not intended for direct programmatic 
>> access via code, so you don’t need to address them directly in your Groovy 
>> code.
>>
>> If you need to populate specific values at configuration time, you can 
>> define dynamic properties on the processor config and reference those 
>> directly in code (see any existing processor source for examples).
>>
>> Andy LoPresto
>> alopre...@apache.org
>> alopresto.apa...@gmail.com
>> He/Him
>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>>
>> On Aug 9, 2020, at 10:40 PM, Saloni Udani  wrote:
>>
>> Thanks Andy.
>>
>> By variables I meant NiFi process group variables.
>> 
>>
>> On Sat, Aug 8, 2020 at 12:39 AM Andy LoPresto  wrote:
>>>
>>> I think we need additional clarification on what you mean by “variables”. 
>>> If you are referring to actual Groovy variables, you can enumerate them 
>>> using the binding available in the context (see below). If you mean the 
>>> attributes available on a flowfile, you can access them similarly.
>>>
>>> Find all variables starting with prefix:
>>>
>>> def varsStartingWithABC = this.binding.variables.findAll { k,v -> 
>>> k.startsWith(“a.b.c”) }
>>>
>>> Find all attributes starting with prefix:
>>>
>>> def attrsStartingWithABC = flowfile.getAttributes().findAll { k,v -> 
>>> k.startsWith(“a.b.c”) }
>>>
>>>
>>>
>>> Andy LoPresto
>>> alopre...@apache.org
>>> alopresto.apa...@gmail.com
>>> He/Him
>>> PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69
>>>
>>> On Aug 7, 2020, at 2:11 AM, Saloni Udani  wrote:
>>>
>>> Hi,
>>> We use NiFi 1.5.0.
>>> Our use case is to get particular key pattern variables (key and value) in 
>>> the groovy InvokeScriptedProcessor. E.g I want all variables whose key 
>>> starts with "a.b.c". By this I can write a generic logic on certain 
>>> categories of variables for further use.
>>>
>>> Is there a way programmatically to get all variables with a certain key 
>>> pattern? Or for that matter is there a way programmatically to get all 
>>> available variables Map ? In NiFi 1.5.0 or further versions.
>>>
>>>
>>> Thanks
>>> Saloni Udani
>>>
>>>
>>


Re: Processor Extensibility

2020-07-07 Thread Matt Burgess
This is probably better suited for the dev list (not sure if you're
subscribed but please do, BCC'ing users and moving to dev), but the
implementations (components and their NARs) are not designed to be
subclassed for custom extensions outside the codebase, can you
describe your use case (and custom processor)? If there's a common
reusable interface we can talk about moving it to an API NAR and such,
but I believe in general the guidance is to do the copy/paste if you
need code from the existing components in the codebase.

Regards,
Matt

On Tue, Jul 7, 2020 at 10:14 PM Eric Secules  wrote:
>
> Hello,
>
> I was wondering if there was a recommendation on how to extend the 
> functionality of nifi processors without forking the nifi repository. I'm 
> looking for a way to include a processor's nar in my project and extend from 
> it. I'd also like to be able to extend that processor's test suite so I can 
> leverage that. The "solution" I found (if you can call it that) was to copy 
> the code from Validate record.java into a new class and make the changes I 
> wanted to.
>
> Thanks,
> Eric


Re: Hive_1_1 Processors and Controllers Missing in NiFi 1.11.4

2020-07-07 Thread Matt Burgess
Harsha,

There are two NARs associated with Hive components,
nifi-hive-services-api-nar which has the Hive1_1ConnectionPool service
(actually an interface, but that's under the hood), and the
nifi-hive1_1-nar which has the processors that declare themselves as users
of that interface (and the actual implementations). Did you keep the old
Hive 1.1 NAR in the lib/ folder when you upgraded? I would think if you
removed the 1.9.2 NARs and used just the 1.11.4 NARs, NiFi would only find
the newer ones and automatically upgrade the existing processors and
controller services, but I haven't tried it myself. I suspect if they are
both present then NiFi will try to match the version of the processors to
the controller services and they won't match. Either you'd need both 1.9.2
NARs (the services-api one and the processors one) or neither, I recommend
the latter so the components get upgraded to the latest version and that
they match. If you're doing a hot reload by dropping the NARs into the
dynamic loading location, you'll probably want to remove the old ones
entirely, put the new ones in lib/, and restart NiFi.

Regards,
Matt



On Tue, Jul 7, 2020 at 6:27 PM Sri Harsha Chavali <
sriharsha.chav...@outlook.com> wrote:

> Hi All,
>
> We recently upgraded from nifi 1.9.2 to 1.11.4 and are facing an issue
> with Hive 1_1 processors. When we were in 1.9.2 we manually deployed the
> "nifi-hive_1_1-nar-1.9.2.nar" file (per suggestion in email chain
> https://lists.apache.org/list.html?users@nifi.apache.org:lte=5y:SELECTHIVE_1_1_QL%20-%20NiFi%201.9.0%20Missing)
> in the extensions folder. Now after upgrading to 1.11.4 we notice that the
> nar file (nifi-hive_1_1-nar-1.11.4.nar) is missing in the tarball again so
> we had to manually add it again to the extensions folder.
>
> The issue is not with getting and adding the nar file to extensions as
> nifi automatically unpacks it and makes processors and controllers
> available. The issue is that we need to redo all the controllers and
> processors if we download the latest nifi-hive_1_1-nar-1.11.4.nar file. If
> we use the old nifi-hive_1_1-nar-1.9.2.nar we get the below error. The rest
> of the controllers/processors are automatically upgraded or are backward
> compatible. Could you please let me know if we need to redo all the
> processors or if there is a easy way to fix this issue by eliminating all
> the redo steps?
>
>
>
>
>
> Thank you,
> Harsha
>
> Sent from Outlook 
>


Re: Hive_1_1 Processors and Controllers Missing in NiFi 1.11.4

2020-07-07 Thread Matt Burgess
Harsha,

The nifi-hive-services-api-nar is included with the NiFi assembly/release
and includes the definition of what the Hive1_1ConnectionPool should be,
but the actual component is in the nifi-hive1_1-nar, which is not included
with the NiFi release (due to space concerns) and must be manually
downloaded.

Regards,
Matt


On Tue, Jul 7, 2020 at 9:01 PM Sri Harsha Chavali <
sriharsha.chav...@outlook.com> wrote:

> Hi Matt,
>
> Restarting NiFi did the trick. I removed the old nifi-hive_1_1-nar-1.9.2.nar
> from the extensions directory and "restarted" NiFi and all the processors
> and controllers automatically picked the newer versions. Earlier I removed
> the old version and placed the new version of the nar in extensions folder
> but never restarted NiFi (assumed NiFi would dynamically drop the deleted
> nars like how it's dynamically loading) 
>
> But one thing to note:
> When I check the lib directory in the latest release I don't find the
> nifi-hive-1_1* at all. All we see is the below nars, which I assume are not
> 1_1 versions. So the 1_1 nars are completely missing in lib directory. We
> need to manually load them again.
>
>
>
> Thank you,
> Harsha
>
> Sent from Outlook <http://aka.ms/weboutlook>
> --
> *From:* Matt Burgess 
> *Sent:* Tuesday, July 7, 2020 7:05 PM
> *To:* users@nifi.apache.org 
> *Subject:* Re: Hive_1_1 Processors and Controllers Missing in NiFi 1.11.4
>
> Harsha,
>
> There are two NARs associated with Hive components,
> nifi-hive-services-api-nar which has the Hive1_1ConnectionPool service
> (actually an interface, but that's under the hood), and the
> nifi-hive1_1-nar which has the processors that declare themselves as users
> of that interface (and the actual implementations). Did you keep the old
> Hive 1.1 NAR in the lib/ folder when you upgraded? I would think if you
> removed the 1.9.2 NARs and used just the 1.11.4 NARs, NiFi would only find
> the newer ones and automatically upgrade the existing processors and
> controller services, but I haven't tried it myself. I suspect if they are
> both present then NiFi will try to match the version of the processors to
> the controller services and they won't match. Either you'd need both 1.9.2
> NARs (the services-api one and the processors one) or neither, I recommend
> the latter so the components get upgraded to the latest version and that
> they match. If you're doing a hot reload by dropping the NARs into the
> dynamic loading location, you'll probably want to remove the old ones
> entirely, put the new ones in lib/, and restart NiFi.
>
> Regards,
> Matt
>
>
>
> On Tue, Jul 7, 2020 at 6:27 PM Sri Harsha Chavali <
> sriharsha.chav...@outlook.com> wrote:
>
> Hi All,
>
> We recently upgraded from nifi 1.9.2 to 1.11.4 and are facing an issue
> with Hive 1_1 processors. When we were in 1.9.2 we manually deployed the
> "nifi-hive_1_1-nar-1.9.2.nar" file (per suggestion in email chain
> https://lists.apache.org/list.html?users@nifi.apache.org:lte=5y:SELECTHIVE_1_1_QL%20-%20NiFi%201.9.0%20Missing)
> in the extensions folder. Now after upgrading to 1.11.4 we notice that the
> nar file (nifi-hive_1_1-nar-1.11.4.nar) is missing in the tarball again so
> we had to manually add it again to the extensions folder.
>
> The issue is not with getting and adding the nar file to extensions as
> nifi automatically unpacks it and makes processors and controllers
> available. The issue is that we need to redo all the controllers and
> processors if we download the latest nifi-hive_1_1-nar-1.11.4.nar file. If
> we use the old nifi-hive_1_1-nar-1.9.2.nar we get the below error. The rest
> of the controllers/processors are automatically upgraded or are backward
> compatible. Could you please let me know if we need to redo all the
> processors or if there is a easy way to fix this issue by eliminating all
> the redo steps?
>
>
>
>
>
> Thank you,
> Harsha
>
> Sent from Outlook <http://aka.ms/weboutlook>
>
>


Re: Error with FetchFTP when filename has non-ASCII charachters

2020-07-14 Thread Matt Burgess
Luca,

I'm guessing the issue is the same as the one in [1] but it just wasn't
fixed for FetchFTP. Please feel free to write an improvement Jira [2] to
add this to FetchFTP as well.

Regards,
Matt

[1] https://issues.apache.org/jira/browse/NIFI-4137
[2] https://issues.apache.org/jira/browse/NIFI


On Tue, Jul 14, 2020 at 9:50 AM Luca Giovannini <
luca.giovann...@dedagroup.it> wrote:

>
>
> When FetchFTP tries to fetch a file with name containing non-ASCII
> characters (e.g. “Xml_APE001_unit*à*immobiliareguarda.xml”), the
> process succeeds but the corresponding flowfile is empty.
>
> Is there a way to make the FetchFTP processor succeed for real and really
> fetch the file?
>
>
>
> Thank you very much,
>
>
>
> Luca
>
>
>
>
>
> *Luca Giovannini*
> Information Systems Analyst
> *Dedagroup Public Services*
>
> www.linkedin.com/in/lucagio/
>
> T +39.051.278.928 | M +39.347.799.3183 | VoIP 951.128
> Dedagroup Public Services Srl – Sede di Casalecchio di Reno, Via del
> Lavoro 67
>
> www.dedagroup.it/public-services
>
>
>
> 
>
>
> Le informazioni contenute in questo messaggio di posta elettronica sono
> riservate e confidenziali e ne e' vietata la diffusione in qualsiasi modo o
> forma. Qualora Lei non fosse la persona destinataria del presente
> messaggio, La invitiamo a non diffonderlo e ad eliminarlo, dandone
> gentilmente comunicazione al mittente.
>
> The information included in this e-mail and any attachments are
> confidential and may also be privileged. If you are not the correct
> recipient, you are kindly requested to notify the sender immediately, to
> cancel it and not to disclose the contents to any other person.
>


Re: Enrichment of record data with a REST API

2020-06-29 Thread Matt Burgess
Mike,

I think you can use LookupRecord with a RestLookupService to do this.
If it's missing features or it otherwise doesn't work for your use
case, please let us know and/or write up whatever Jiras you feel are
appropriate.

Regards,
Matt

On Mon, Jun 29, 2020 at 4:56 PM Mike Thomsen  wrote:
>
> Does anyone know a good pattern using the Record API to enrich a data set 
> record by record with a REST API?
>
> Thanks,
>
> Mike


Re: Route Attribute - Database down

2020-06-11 Thread Matt Burgess
Although the error attribute can help as a workaround, counting on a
text value is probably not the best option (although it's pretty much
all we have for now). I wrote up NIFI-7524 [1] to add a "retry"
relationship to ExecuteSQL like we have for PutSQL and
PutDatabaseRecord. It would route things like "Connection refused" to
retry rather than failure.

Regards,
Matt

[1] https://issues.apache.org/jira/browse/NIFI-7524

On Thu, Jun 11, 2020 at 2:49 PM KhajaAsmath Mohammed
 wrote:
>
> Yes , you are right Luis. It comes in executesql processor.
>
> Sent from my iPhone
>
> On Jun 11, 2020, at 11:41 PM, Luis  wrote:
>
> 
>
> HI,
>
> I have done that with PostgreSQL and MSSQL, and it comes in the answer read 
> by ExecuteSQL.
>
> Maybe I am not understanding your question. If you like please send more 
> details.
>
> LC
>
>
>
> El jue, 11-06-2020 a las 12:11 -0500, KhajaAsmath Mohammed escribió:
>
> Hi,
>
> How can I capture the execution error on executesql and route it to different 
> queue when there is failure on the database.
>
> executesql.error.message = 'com.sap.db.jdbc.exceptions.JDBCDriverException: 
> SAP DBTech JDBC: Cannot connect to jdbc:sap://hana-xx:30041 [Cannot 
> connect to host hanaxxx:30041 [Connection refused (Connection refused)], 
> -813.].'
>
> Thanks,
> Asmath


Re: Retry logic for rest api - NIFI

2020-07-30 Thread Matt Burgess
Asmath,

InvokeHttp routes the original flowfile to a number of different
relationships based on things like the status code. For example if
you're looking for a 2xx code but want to retry on that for some
reason, you'd use the "Original" relationship. If you want a retryable
code (5xx) you can use the "Retry" relationship, and so on. If you
need a specific code, you can use RouteOnAttribute to match the
"invokehttp.status.code" attribute to the code(s) you want, and all
that match can be sent back to the original InvokeHttp processor.

That's a simpler pattern but they can get more complex. For example
you can start the flowfile with an attribute "retries" set to 5, and
before you send the flow file back to InvokeHttp, you'd decrement the
counter with UpdateAttribute and then perhaps drop the flowfile or
send it to some other mechanism after retries becomes zero.  Also you
could put a delay in the flow so you're not retrying the same thing as
fast as possible (that could constitute a Denial-Of-Service attack on
the HTTP endpoint you're trying to reach). I can't remember which
relationships from InvokeHttp penalize the flowfile, so there's a
chance that the processor will handle the delay for you (see the
User's Guide section on penalized flowfiles).

Regards,
Matt

On Thu, Jul 30, 2020 at 3:38 PM KhajaAsmath Mohammed
 wrote:
>
> can you please let me know how to use this in NIFI.
>
> On Thu, Jul 30, 2020 at 11:19 AM Otto Fowler  wrote:
>>
>> nipyapi does something like that: 
>> https://github.com/Chaffelson/nipyapi/blob/164351ee2d92f8c4a75989310662bbad0f7bafc4/nipyapi/utils.py#L210
>>
>>
>>
>>
>> On July 30, 2020 at 11:22:29, KhajaAsmath Mohammed (mdkhajaasm...@gmail.com) 
>> wrote:
>>
>> Hi,
>>
>> I am looking for some information on how to do retry logic on restapi until 
>> we get specific status code. Please let me know if you have any 
>> approach/templates for this
>>
>> Thanks,
>> Asmath


Re: schema index out of range

2020-12-02 Thread Matt Burgess
Satish,

Can you provide some sample data that causes this issue?

Thanks,
Matt

On Wed, Dec 2, 2020 at 5:18 AM naga satish  wrote:
>
> Hi all, In record readers(CSVreader) when schema strategy is set to 
> InferSchema, sometimes it keeps on giving error. the error states that index 
> of "student_name" is 1 but csv record as only 1 value. what is this error and 
> whayt causes this error?
>
> thanks,
> satish.


Re: horizontal merge

2020-11-17 Thread Matt Burgess
Geoffrey,

Where are the two flowfiles coming from? This use case is often
handled in NiFi using LookupRecord with one of the LookupService
implementations (REST, RDBMS, CSV, etc.). We don't currently have a
mechanism (besides scripting) to do enrichment/lookups from flowfiles.

For your script, you can do session.get(2) and then check the size of
the returned List. If it is less than 2, you can rollback the session
and return (possibly yielding first if you don't want to check again
rapidly).

Regards,
Matt

On Tue, Nov 17, 2020 at 4:13 PM Greene (US), Geoffrey N
 wrote:
>
> I’m trying to glue two flow files together HORIZONTALLY.  That is,
>
> Flowfile1
>
> ID,STARTINGLETTER
>
> 1,A
>
> 2,B
>
>
>
> And flowfile2:
>
> ID, WORD
>
> 1,Apple
>
> 2, Ball
>
> 3, Cat
>
>
>
> I want it to become:
>
> ID, STARTINGLETTER, WORD
>
> 1,A,Apple
>
> 2,B,Ball
>
> 3,,Cat
>
>
>
> The only way I’ve been able to figure out how to do this is to write a custom 
> “InvokeGroovyProcessor” that takes two flowfiles reads them both, and then 
> concatenates them.
>
>
>
> I’m having trouble figuring out how to pop TWO flow files off the queue, (The 
> order doesn’t really matter for now), and write one. I’ve tried
>
>
>
> @Override
>
> void onTrigger(ProcessContext context, ProcessSessionFactory 
> sessionFactory) throws ProcessException {
>
> try {
>
> def session = sessionFactory.createSession()
>
>  while (session.getQueueSize() != 2)
>
>  {
>
>  Thread.sleep(1000)
>
>  log.debug("sleeping")
>
>   }
>
>  // we never get here
>
>   log.debug(“found two flow files")
>
>   // get BOTH flowfiles
>
>  def flowFileList = session.get(2)
>
>   def flowFile1 = flowFileList.get(0)
>
>   def flowFile2 = flowFileList.get(1)
>
>// now do the glue
>
>
>
> Or Is there a better way?
>
> Thanks!
>
>
>
>
>
>
>
> Geoffrey Greene
>
> Senior Software Ninjaneer
>
> (703) 414 2421
>
> The Boeing Company
>
>


Re: PrometheusReportingTask Metrics

2020-11-02 Thread Matt Burgess
David,

The documentation for the metrics is in the "help" section of the datapoint
definition, if you hit the REST endpoint you can see the descriptions, also
they are listed in code [1].

Regards,
Matt

[1]
https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-extension-utils/nifi-prometheus-utils/src/main/java/org/apache/nifi/prometheus/util/NiFiMetricsRegistry.java



On Wed, Oct 28, 2020 at 8:28 PM Ritch, David  wrote:

> Thank you for sharing that.  We were able to get it to collect statistics
> - we're just not sure exactly what each statistic indicates.
>
> On Wed, Oct 28, 2020 at 8:18 PM Pavel S 
> wrote:
>
>> I don't have an answer to your question. However I just wanted to share
>> something related to this. We have tried using prometheus reporting task on
>> my project and I couldn't get it to work. I have ended up creating my own
>> application that generates prometheus metrics.
>>
>> On Wed, Oct 21, 2020, 7:44 PM Ritch, David  wrote:
>>
>>> Is there a description of each of the metrics provided by
>>> the PrometheusReportingTask available somewhere?
>>>
>>> Thank you!
>>>
>>> --
>>>
>>> David Ritch
>>> Principal Software Engineer
>>>
>>> WaveStrike/Analytics Capabilities Division
>>>
>>>
>>>
>>> 
>>>
>>> 300 Sentinel Dr. Ste. 210
>>>
>>> Annapolis Junction, MD 20701
>>> Email dri...@novetta.com
>>>
>>> Office (443) 661-4810x1173
>>> Mobile (443) 718-9327
>>>
>>
>
> --
>
> David Ritch
> Principal Software Engineer
>
> WaveStrike/Analytics Capabilities Division
>
>
>
> 
>
> 300 Sentinel Dr. Ste. 210
>
> Annapolis Junction, MD 20701
> Email dri...@novetta.com
>
> Office (443) 661-4810x1173
> Mobile (443) 718-9327
>


Re: GetFile with putsql/executesql

2020-10-28 Thread Matt Burgess
Asmath,

GetFile doesn't take an input connection, but if the attribute is
going to contain a file to ingest, you can use FetchFile instead. To
get an attribute from a database, take a look at LookupAttribute with
a SimpleDatabaseLookupService. Depending on the query you were going
to execute, you may be able to instead look up a value given a key in
your incoming flow file, and the resulting value from the database
will be put in an attribute for use downstream.

Regards,
Matt

On Wed, Oct 28, 2020 at 3:07 PM KhajaAsmath Mohammed
 wrote:
>
> Hi,
>
> I have a scenario where I need to get value from the database and pass it as 
> an attribute for getfile in subsequent processors.
>
> GetFile >> Execute SQL/PUTSQL >> Get value from the output of SQL and assign 
> it to Attribute >>  pass attribute value to GetFIle .
>
> Any help please ?
>
> Thanks,
> Asmath
>


Re: Dynamic Attribute Naming

2021-01-06 Thread Matt Burgess
Eric,

I don't believe it's possible in NiFi per se, because you'd have to
set it via a property, and properties have unique and static names so
EL is not evaluated on them.  However you can use Groovy with
ExecuteScript to do this, check out [1] under the recipe "Add an
attribute to a flow file".

Regards,
Matt

[1] 
https://community.cloudera.com/t5/Community-Articles/ExecuteScript-Cookbook-part-1/ta-p/248922

On Tue, Jan 5, 2021 at 2:44 PM Eric Secules  wrote:
>
> Hello,
>
> I was wondering if it's possible to name an attribute based on an EL 
> statement like
>
> MyAttr.${fragment.index}
>
> Then when flow files are merged back together all the attributes are 
> preserved.
>
> Thanks,
> Eric


Re: Issue when extracting a large dataset into CSV files

2021-05-10 Thread Matt Burgess
Vibath,

What is the "Fetch Size" property set to? It looks like PostgreSQL
will load all results if Fetch Size is set to zero [1]. Try setting it
to 1 or something like that, whatever doesn't use too much memory
but doesn't slow down the performance too much.

Regards,
Matt

[1] https://jdbc.postgresql.org/documentation/head/query.html

On Mon, May 10, 2021 at 3:41 AM Vibhath Ileperuma
 wrote:
>
> Hi Vijay,
>
> Eventhough that property is set to a postive value, same issue happens. Seems 
> like NIFI first loads all the rows to RAM and write to multiple files when 
> that property is set to a non-zero value.
> On the other hand, if this property is set to a very small value, a large no 
> of flow files can be generated at once.
>
> Thank you.
> Best regards,
> Vibhath
>
> On Mon, 10 May 2021, 9:00 am Vijay Chhipa,  wrote:
>>
>> Hi Vibhath,
>>
>> There is this  property on the processor
>>
>> Max Rows Per Flow File
>>
>>
>> Per docs:  If the value specified is zero, then all rows are returned in a 
>> single FlowFile.
>> Which seems to be what is happening in your case.
>>
>>
>>
>> On May 9, 2021, at 12:11 PM, Vibhath Ileperuma  
>> wrote:
>>
>> executeSQLRecord
>>
>>


Re: Invalidating InvokeScriptedProcessors when Script Files changes

2021-07-06 Thread Matt Burgess
Dirk,

We could look at adding a FileWatcher or something to
InvokeScriptedProcessor, but I doubt we'd want to allow re-evaluating
the script on the fly, maybe we would just set a flag indicating a
change was detected and the next time the processor is started or the
script would be evaluated, we'd reload the file contents at that
point.

IIRC, using Script File was originally intended for "finished" scripts
to be located on each node of a cluster (or shared), so the sysadmins
could control the file rather than the flow designers. If you're
updating/testing scripts, I recommend copying the contents to Script
Body until it works the way you want, then copying the script back to
the file.

Regards,
Matt

On Thu, Jul 1, 2021 at 9:30 PM Dirk Arends  wrote:
>
> Hi All,
>
> I have a query regarding InvokeScriptedProcessors and the best way to 
> invalidate a script context. I'm using a number of InvokeScriptedProcessors 
> with a “Script File” set. Each time I update the script at that path, 
> restarting the InvokeScriptedProcessor does not cause the updated script to 
> be re-evaluated.
>
> From reading the InvokeScriptedProcessor implementation I can see that the 
> script will be re-evaluated when one of the Script Engine, Script File 
> (path), Script Body, or Module Directory properties are modified. This is 
> sufficient when using “Script Body”, but in the case of a Script File the 
> properties don’t change and therefore the script is not re-evaluated.
>
> My current workaround is to:
>
> 1. Stop the processor
> 2. Edit the Script Body and add a character
> 3. Apply changes
> 4. Edit the Script Body and remove the character
> 5. Apply changes
> 6. Start the processor again
> * I could use any of the 4 properties but Script Body has been the easiest 
> since it's empty.
>
> This obviously isn't ideal and isn't something I'm keen to integrate in a 
> deployment process (where my current thinking is to make NiFi API requests to 
> mimic the process).
>
> Is anyone able to suggest a better solution to this problem?
>
> Regards,
>
> --
> Dirk Arends


Re: Issue with GenerateTableFetch Processor

2021-02-10 Thread Matt Burgess
John,

It should be generating multiple queries with OFFSET, I tried to
reproduce in a unit test (using Derby not MySQL) and everything looked
fine. I ran it once with 3 rows and a partition size of 2 and got the
expected 2 flowfiles (one with 2 rows and one with 1). Then I added 6
rows and ran again (this simulates a larger number of rows than the
partition size coming in before the processor gets run again (9 min in
your case). I got the expected 3 flowfiles out with the correct SQL:

SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND ID <= 8 ORDER BY ID
FETCH NEXT 2 ROWS ONLY
SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND ID <= 8 ORDER BY ID
OFFSET 2 ROWS FETCH NEXT 2 ROWS ONLY
SELECT * FROM TEST_QUERY_DB_TABLE WHERE ID > 2 AND ID <= 8 ORDER BY ID
OFFSET 4 ROWS FETCH NEXT 2 ROWS ONLY

I can try it on MySQL to see if it makes any difference, but I
wouldn't think so. In any case it might make sense to tune the
partition size and run schedule to better characterize the volume and
rate of rows coming in, such as setting the run schedule to something
like 1 minute.

Regards,
Matt


On Wed, Feb 10, 2021 at 3:18 PM John W. Phillips
 wrote:
>
> I’m having an issue with the GenerateTableFetch Processor, and I wanted to
> ask for some insight into whether this is a bug or expected behavior.  Using
> NiFi 1.12.1 I have a MySQL table with 1M+ rows, and I have a
> GenerateTableFetch processor with a `maximum-value column` and
> `partition-size` set to 25000 and a `run schedule` of 9 minutes.  When the
> etl starts up I get a sequence of queries for the existing 1M+ rows like
> this example
> `SELECT … ORDER BY maxvalcolumn LIMIT 25000 OFFSET 375000`.
>
> The on 9 minutes intervals I get queries like
> `SELECT … FROM ... WHERE maxvalcolumn > … AND maxvalcolumn <= … ORDER BY
> maxvalcolumn LIMIT 25000`
>
> The issue is that I see only 1 query per 9 minutes with a `LIMIT 25000`, so
> if my table accumulates more than 25000 rows in 9 minutes the `LIMIT 25000`
> term simply drops the additional rows and they are passed up.  Does the
> GenerateTableFetch delta copy generate any additional queries with the
> `OFFSET` term?  I’m not sure if there’s a configuration where I can get
> multiple queries using the `OFFSET` term in the 9 minute interval, or if I
> can have the query generated without the `LIMIT 25000` term.
>
> Thanks,
> John
>
>
>
> --
> Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/


Re: Upsert logic with SQLServer - NIFI Putdatabase

2021-03-11 Thread Matt Burgess
Asmath,

Upsert in SQL Server (without NiFi) can be difficult, and even
error-prone if concurrency is needed [1]. I suspect that's why it
hasn't been attempted in PutDatabaseRecord (well, via the MSSQL
adapter(s)) as of yet. I haven't tried it without creating a procedure
so I'm not sure if the MSSQL database adapter can create a standalone
statement for MERGE or "try INSERT catch(error) then UPDATE".

Other DBs such as MySQL and PostgreSQL have easier ways to achieve
this so NiFi has been able to support Upsert for those DBs.

Rather than deleting the row, maybe you could catch any INSERT errors
and route them back to the processor using UPDATE instead?

Regards,
Matt

[1] 
https://michaeljswart.com/2017/07/sql-server-upsert-patterns-and-antipatterns/

On Thu, Mar 11, 2021 at 3:18 PM KhajaAsmath Mohammed
 wrote:
>
> Hi,
>
> I am looking for some help on how to deal with upserts/updates on SQL Server 
> with NIFI.
>
> I get a flow file where the records are updated. Primary key are already 
> defined on the table.
>
> I don't want to try the logic of delete and insert, is there a way to handle 
> upsert automatically with this approach? This can be present for any table 
> and should be dynamic.
>
> Thanks,
> Asmath


Re: Issue with QueryRecord failing when data is missing

2021-02-25 Thread Matt Burgess
Jens,

What is the Schema Access Strategy set to in your CSVReader? If "Infer
Schema" or "Use String Fields From Header", the setting of "Treat
First Line As Header" should be ignored as those two options require a
header be present anyway. If you know the schema ahead of time you
could set it in the CSVReader rather than inferring it.

For "Infer Schema", there's a bug where the inferred schema is empty
because we don't have any records from which to infer the types of the
fields (even though the field names are present). I wrote up NIFI-8259
[1] to infer the types as strings when no records are present.

As a workaround you could filter out any FlowFiles that have no
records, either by using CountText or the 'record.count' attribute if
it has been set, into a RouteOnAttribute. Alternatively you could
emulate what NIFI-8259 is going to do by using "Use String Fields From
Header" in your CSVReader, but in that case you might need a CAST(colC
as BOOLEAN) in your SQL since populated FlowFiles could have the
correctly inferred schema where empty FlowFiles (or if "Use String
Fields From Header" is set) will think colC is a string rather than a
boolean. The CAST should work in both cases but I didn't try it.

Regards,
Matt

[1] https://issues.apache.org/jira/browse/NIFI-8259

On Thu, Feb 25, 2021 at 1:56 AM Jens M. Kofoed  wrote:
>
> Hi all
>
> I have a issue with using the QueryRecord query csv files. currently i'm 
> running NiFi version 1.12.1 but I also tested this in version 1.13.0
> If my incoming csv file only have a header line and no data it fails
>
> My querying statement looks like this: SELECT colA FROM FLOWFILE WHERE colC = 
> 'true'
>
> Changes made to the CSVReader:
> Treat Firs Line as Header = true
>
> Changes made to the CSVRecordSetWriter:
> Include Header Line = false
> Record Separator = ,
>
> Here are 2 sample data. The first one works as expected, but sample 2 gives 
> errors
> Sample 1:
> colA,colB,colC
> data1A,data1B,true
> data2A,data2B,false
> data3A,data3B,true
>
> Outcome: data1A,data3A,
>
> Sample 2:
> colA,colB,colC
>
> Error message:
> QueryRecord[id=d7c38f75-0177-1000--f694dd96] Unable to query 
> StandardFlowFileRecord[uuid=74a71c6e-3d3f-406c-92af-c9e4e27d6d69,claim=StandardContentClaim
>  [resourceClaim=StandardResourceClaim[id=1614232293848-3, 
> container=Node01Cont01, section=3], offset=463, 
> length=14],offset=0,name=74a71c6e-3d3f-406c-92af-c9e4e27d6d69,size=14] due to 
> java.sql.SQLException: Error while preparing statement [SELECT colA FROM 
> FLOWFILE WHERE colC = true]: 
> org.apache.nifi.processor.exception.ProcessException: java.sql.SQLException: 
> Error while preparing statement [SELECT colA FROM FLOWFILE WHERE colC = true]
>
> Is this a bug?
>
> kind regards
> Jens M. Kofoed


Re: Stopping processor after MAX number of retries

2021-03-01 Thread Matt Burgess
There's an example template on the Example Dataflow Templates page [1]
called Retry_Count_Loop.xml [2], not sure what components it uses
though.

Regards,
Matt

[1] https://cwiki.apache.org/confluence/display/NIFI/Example+Dataflow+Templates
[2] 
https://cwiki.apache.org/confluence/download/attachments/57904847/Retry_Count_Loop.xml?version=1=1433271239000=v2

On Mon, Mar 1, 2021 at 1:07 PM James McMahon  wrote:
>
> Any recommended examples of successful DistributedMapCache implementations?
>
> On Fri, Feb 26, 2021 at 11:30 AM Andrew Grande  wrote:
>>
>> I saw it several times and I have a strong conviction this is an 
>> anti-pattern. A dataflow must not mess with start/stop state of processors 
>> or process groups.
>>
>> Instead, a flow is always running and one puts a conditional check to either 
>> not get the data in or reroute/deny it.
>>
>> If you need to coordinate between processors, use wait/notify.
>>
>> If you need to coordinate across nodes, consider DistributedMapCache with a 
>> variety of implementations.
>>
>> Finally, can use external stores directly if need to coordinate with other 
>> systems.
>>
>> These become part of the flow design.
>>
>> Andrew
>>
>> On Fri, Feb 26, 2021, 7:42 AM Tomislav Novosel 
>>  wrote:
>>>
>>> Hi guys,
>>>
>>>
>>>
>>> I want to stop the processor after exceeding maximum number of retries.
>>>
>>> For that I'm using RetryFlowFile processor, after 5 times of retry, it 
>>> routes
>>>
>>> flowfile to retries_exceeded.
>>>
>>>
>>>
>>> When that kicks in, I want to stop the processor which was retried 5 times.
>>>
>>>
>>>
>>> What is the best approach? I have few ones:
>>>
>>>
>>>
>>> Execute shell script which sends request to nifi-api to set processor state 
>>> to STOPPED
>>> Put InvokeHTTP processor to send request
>>>
>>>
>>>
>>> The downside is, what if processor-id changes, e.g. deploying to another 
>>> env or nifi restart, not sure about that.
>>>
>>> Also, it is nifi cluster with authentication and SSL, so it complicates the 
>>> things.
>>>
>>>
>>>
>>> Maybe someone has much simpler approach, with backpressure or something.
>>>
>>>
>>>
>>> Regards,
>>>
>>> Tom


Re: Some SimpleRecordSchema questions

2021-03-01 Thread Matt Burgess
Geoffrey,

In general you won't need to create your own DataType objects, instead
you can use the RecordFieldType methods such as
RecordFieldType.ARRAY.getArrayDataType(DataType elementType, boolean
elementsNullable).getDataType(). So for an array of ints:

myRecordFields.add(new RecordField("allMyIntegers",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType(),
true)))

If I misunderstood what you're trying to do please let me know and
I'll help where I can.

Regards,
Matt


On Fri, Feb 26, 2021 at 7:04 PM Greene (US), Geoffrey N
 wrote:
>
> I’m writing a LookupService in Groovy for my ScriptedLookupService.  
> It is, as everyone had suggested, significantly faster than split/merge.
>
>
>
> I’m really very close to having it working.  In fact, it works fine when my 
>  is simple; a few strings.
>
>
>
> My situation, though, is that my service returns an ARRAY of records for a 
> single value.  I can’t figure out how to construct the schema correctly.
>
>
>
> So far, I have
>
> public Optional lookup(Map coords) throws 
> LookupFailureException {
>
>   oneValueSchemaFields.add(new RecordField("number", new 
> DataType(RecordFieldType.INT,"")))
>
>   oneValueSchema = new SimpleRecordSchema(oneValueSchemaFields)
>
>
>
>   // so far so good. But I need to return an ARRAY of these oneValueSchemas.
>
>   def howDoIConstructThisSchema = new SimpleRecordSchema(??)
>
>
>
> String valueString =” [ {\“number\” : 1}, {\“number\” : 2},{ 
> \“number\”:3}]”
>
>   def jsonSlurper = new groovy.json.JsonSlurper()
>
>   def values = jsonSlurper.parseText(valueString)
>
>
>
>  return Optional.ofNullable(new MapRecord(howDoIConstructThisSchema, values))
>
> }
>
> How do I construct the outer schema? How do you have one schema containing an 
> array of oneValueSchemas?  There’s something stupid I am missing, I am sure.
>
> I did try just sending in oneValueSchema into the MapRecord constructor, but 
> it doesn’t appear that that worked either.  Suggestions?
>
>
>
> Thanks
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Geoffrey Greene
>
>


Re: Incremental Fetch in NIFI

2021-02-24 Thread Matt Burgess
Khaja,

There are two options in NiFi for incremental database fetch:
QueryDatabaseTable and GenerateTableFetch. The former is more often
used on a standalone NiFi cluster for single tables (as it does not
accept an incoming connection). It generates the SQL needed to do
incremental fetching, then executes the statements and writes out the
rows to the outgoing flowfile(s).  GenerateTableFetch is meant for a
cluster or for multiple tables (as it does accept an incoming
connection), and does the "first half" of what QueryDatabaseTable
does, generating SQL statements but not executing them. The statements
are written out as FlowFiles to be executed downstream.

In a cluster, these processors are meant to run on the primary node
only, otherwise each node will fetch the same information from the
database and handle it in its own copy of the flow. Set the processor
to run on the Primary Node Only. If using GenerateTableFetch, you can
distribute the generated SQL statements using a Remote Process Group
-> Input Port or a Load-Balanced Connection to an ExecuteSQL processor
downstream, which will parallelize the actual fetching among the
cluster nodes.

Regards,
Matt

On Wed, Feb 24, 2021 at 3:46 PM KhajaAsmath Mohammed
 wrote:
>
> Hi,
>
> I have a use case where I need to do incremental fetch on the oracle tables. 
> Is there a easy way to do this? I saw some posts about querydatabase table. 
> want to check if there is any efficient way to do this?
>
> Thanks,
> Khaja


Re: ScriptedLookupService

2021-02-25 Thread Matt Burgess
Geoffrey,

There are two main types of LookupService implementations used by
processors like LookupAttribute and LookupRecord, namely
LookupService and LookupService. The former does a
single lookup and uses the single returned key. LookupRecord is most
often used with LookupService implementations and will insert
all fields from the returned record either as an entire record in one
field, or the individual fields from the returned Record (depending on
how the processor is configured).

Long story short, to return multiple values you should implement
LookupService, making your lookup method have this signature

public Optional lookup(Map coordinates) throws
LookupFailureException

The argument to the method is always Map, but the
return type is Optional where you implement LookupService. Note
that you'll need to construct a Record in order to return multiple
values. Please let me know if you have any issues getting going.

Regards,
Matt

On Thu, Feb 25, 2021 at 3:56 PM Greene (US), Geoffrey N
 wrote:
>
> Writing my first ScriptedLookupService in groovy, and I think I have a pretty 
> simple question:
>
> I’d like to be able to return multiple values in one lookup, but I can’t 
> figure out what my return type needs to be.
>
> String isn’t right, obviously, and returning a Map isn’t 
> right.
>
> Is lookup only able to handle one value? Seems like you should be able to 
> look up multiple values.
>
>
>
> class MyValueLookupService implements LookupService {
>
> ComponentLog log = null
>
> final String ID = UUID.randomUUID().toString()
>
> @Override
>
> Optional lookup(Map lookupMap) { //ß-- wrong 
> return type
>
> // this is wrong
>
> 
> Optional.ofNullable(slurper.parseText("{\"key1\":\"value1\”,\”key2\”:\”value2\”}"))
>
> }
>
>
>
> @Override
>
> Class getValueType() {
>
> // This is wrong too
>
> return Object
>
> }
>
> … other stuff
>
> }
>
> lookupService = new MyLookupService()
>
>
>
> Thanks
>
>
>
> Geoffrey Greene
>
>


Re: some questions about splits

2021-02-24 Thread Matt Burgess
Geoffrey,

There's a really good blog by the man himself [1] :) I highly recommend the
official blog in general, lots of great posts and many are record-oriented
[2]

Regards,
Matt

[1] https://blogs.apache.org/nifi/entry/record-oriented-data-with-nifi
[2] https://blogs.apache.org/nifi/

On Wed, Feb 24, 2021 at 5:57 PM Greene (US), Geoffrey N <
geoffrey.n.gre...@boeing.com> wrote:

> Thank you for the fast response Mark.
>
>
>
> Hrm, record processing does sound useful.
>
>
>
> Are there any good blogs / documentation on this?  I’d really like to
> learn more.  I’ve been doing mostly text processing, as you’ve observed.
>
>
>
> My use case is something like this
>
> 1)  Use server API to get list of sensors
>
> 2)  Use server API to get list of jobs
>
> 3)  For each job, get count of frames/job.  There are up to 6 sets of
> frames/job, depending on which set you query for.
>
> 4)  Frames can only be queried 50 at a time, so for each 50, get set
> of time stamps of a frame
>
> 5)  For each time stamp, query for all sensor values at that time.
> These have to be queried one-at-a-time., because of the way the API works –
> one sensor value can only be given for one time.
>
> 6)  Glue all the data associated with that job (frame times, sensor
> readings,etc) together and paste in a big json. (There are more steps after
> that.
>
>
>
>
>
> Geoffrey Greene
>
> Associate Technical Fellow/Senior Software Ninjaneer
>
> (703) 414 2421
>
> The Boeing Company
>
>
>
> *From:* Mark Payne [mailto:marka...@hotmail.com]
> *Sent:* Wednesday, February 24, 2021 5:20 PM
> *To:* users@nifi.apache.org
> *Subject:* [EXTERNAL] Re: some questions about splits
>
>
>
> EXT email: be mindful of links/attachments.
>
>
>
>
> Geoffrey,
>
>
>
> At a high level, if you’re splitting multiple times and then trying to
> re-assemble everything, then yes I think your thought process is correct.
> But you’ve no doubt seen how complex and cumbersome this approach can be.
> It can also result in extremely poor performance. So much so that when I
> began creating a series of YouTube videos on NiFi Anti-Patterns, the first
> anti-pattern that I covered was the splitting and re-merging of data [1].
>
>
>
> Generally, this should be an absolute last resort, and Record-oriented
> processors should be used instead of splitting the data up and re-merging
> it. If you need to perform REST calls, you could do that with LookupRecord,
> and either use the RESTLookupService or if that doesn’t fit the bill
> exactly you could actually use the ScriptedLookupService and write a small
> script in Groovy or Python that would perform the REST call for you and
> return the results. Or perhaps the ScriptedTransformRecord would be more
> appropriate - hard to tell without knowing the exact use case.
>
>
>
> Obviously, your mileage may vary, but switching the data flow to use
> record-oriented processors, if possible, would typically yield a flow that
> is much simpler and yield throughput that is at least an order of magnitude
> better.
>
>
>
> But if for whatever reason you do end up being stuck with the split/merge
> approach - the key would likely be to consider backpressure heavily. If you
> have backpressure set to 10,000 FlowFiles (the default) and then you’re
> trying to merge together data, but the data comes from many different
> upstream splits, you can certainly end up in a situation like this, where
> you don’t have all of the data from a given ’split’ queued up. for
> MergeContent.
>
>
>
> Hope this helps!
>
> -Mark
>
>
>
> [1] https://www.youtube.com/watch?v=RjWstt7nRVY
>
>
>
>
>
> On Feb 24, 2021, at 4:59 PM, Greene (US), Geoffrey N <
> geoffrey.n.gre...@boeing.com> wrote:
>
>
>
> Im having some trouble with multiple splits/merges.  Here’s the idea:
>
>
>
>
>
> Big data -> split 1->Save all the fragment.*attributes into variables ->
> split 2-> save all the fragment.* attributes
>
> |
>
> Split 1
>
>|
>
> Save fragment.* attributes into split1.fragment.*
>
> |
>
> Split 2
>
> |
>
> Save fragment.* attributes into split2.fragment.* attributes
>
> |
>
> (More processing)
>
> |
>
> Split 3
>
> |
>
> Save fragment.* attributes into split3.fragment.* attributes
>
> |
>
> (other stuff)
>
> |
>
> Restore split3.fragment.* attributes to fragment.*
>
> |
>
> Merge3, using defragment strategy
>
> |
>
> Restore split2.fragment.* attributes to fragment.*
>
> |
>
> Merge 2 using defragment strategy
>
> |
>
> Restore split1.frragment.* attributes to fragment.*
>
> |
>
> Merge 1 using defragment strategy
>
>
>
> Am I thinking about this correctly?  It seems like sometimes, nifi is
> unable to do a merge on some of the split data (errors like “there are 50
> fragments, but we only found one).  Is it possible that I need to do some
> prioritization in the queues? I have noticed that my things do back up and
> the queues seem to fill up as its going through (several of the splits need
> to perform rest calls and processing, which can take 

Re: Possible problem with DBCPConnectionPool 1.12.1

2021-04-19 Thread Matt Burgess
Carlos,

>From the DBCP doc:

If maxIdle is set too low on heavily loaded systems it is possible you
will see connections being closed and almost immediately new
connections being opened. This is a result of the active threads
momentarily closing connections faster than they are opening them,
causing the number of idle connections to rise above maxIdle. The best
value for maxIdle for heavily loaded system will vary but the default
is a good starting point.

In your case, you have Max Idle set to 8 but you have 10 concurrent
tasks closing connections at the same time, I suspect this may cause
some mishandling of connections by the pool. Can you set Max Idle to
15 and see if the problem persists?

Thanks,
Matt

On Mon, Apr 19, 2021 at 4:11 PM Carlos Manuel Fernandes (DSI)
 wrote:
>
> Hello,
>
>
>
> After upgrading to Nifi  1.13.2 from Nifi 1.9.2, I notice a problem with 
> DBCPConnectionPool, several connections remain on pool above ‘Max Idle 
> Connections’ number after the ‘Minimum Evictable Idle Time’ is passed’.
>
>
>
> To reproduce the issue  I use a simple flow:  ExecuteSql (Concurrent 
> Tasks:10) ->LogAttribute
>
>
>
> On Execute Sql:
>
> Database Connection Pooling Service  : XPTO-CQ
>
> select query: select count(*) as cont 
> from Table
>
>
>
> On pool XPTO-CQ :
>
> Database Connection URL   
> jdbc:xxx:thin://xpto:10800
>
> Database Driver Class Name   org.apache.somedriver
>
> Validation query   select 1
>
> Minimum Idle Connections   0
>
> Max Idle Connections  8
>
> Max Connection Lifetime  -1
>
> Time Between Eviction Runs-1
>
> Minimum Evictable Idle Time   1 mins
>
> Soft Minimum Evictable Idle Time -1
>
>
>
> After run ExecuteSql several times the number of connections ESTABLISHED, 
> using netstat –na | grep 10800 is above 8 and never be destroyed. The unique 
> workaround I founded to avoid a rise in connections was to put ‘Max Idle 
> Connections’ to 0, in practice not use the pool at all.
>
>
>
> Thanks
>
> Carlos
>
>


Re: nifi 1.11 /metrics and prometheus question

2021-08-09 Thread Matt Burgess
You might be running into NIFI-7379 [1] where the different Prometheus
components are writing to the same registries. If you upgrade to a
later version of NiFi you should see the correct data.

Regards,
Matt

[1] https://issues.apache.org/jira/browse/NIFI-7379

On Mon, Aug 9, 2021 at 11:35 AM Shailesh Ligade  wrote:
>
> Hello,
>
> In the data produced by /metrics, I can see aline
>
> # HELP nifi_backpressure_enabled Whether backpressure has been applied for 
> this component. Values are 0 or 1
>
> However I don’t see any data related to backpressure, even though I am pretty 
> sure there should be some backpressure data.
>
> Do I need to turn this ON (nifi_backpressure_enabled), or some other 
> property? If so where? I don’t see anything related to this on Prometheus 
> reporting task.
>
> Thanks
>
> -S
>


Re: MiNiFi agent cannot update flow configuration

2021-09-21 Thread Matt Burgess
Tom,

Which implementation of the Provenance Repository are you using? If
not the VolatileProvenanceRepository, can you try that as a
workaround? Also are you using the 1.14.0 version of the C2 server?

Regards,
Matt

On Tue, Sep 21, 2021 at 3:45 PM Tomislav Novosel
 wrote:
>
> Hi to all,
>
>
> I'm using MiNiFi 1.14.0 with configured change ingestor to pull from HTTP C2 
> server
> whenever there is a change in configuration (change in NiFi flow that suppose 
> to be running
> on MiNiFi).
>
> MiNiFi agent is running on Raspberry Pi 3 with enough disk space.
>
>
> When I make a change and save the new template with the name 
> template_name.v2, C2 pulls it,
> saves it into ./cache folder and sends it to MiNiFi agent.
>
>
> Then in MiNiFi agent log I have this error:
>
>
>
> 2021-09-21 12:54:26,456 ERROR [MiNiFi logging handler] 
> org.apache.nifi.minifi.StdErr Failed to start flow service: Unable to load 
> flow due to: java.lang.RuntimeException: Unable to create Provenance 
> Repository
> 2021-09-21 12:54:26,457 ERROR [MiNiFi logging handler] 
> org.apache.nifi.minifi.StdErr Shutting down...
> 2021-09-21 12:54:27,384 INFO [main] o.apache.nifi.minifi.bootstrap.RunMiNiFi 
> Swap file exists, MiNiFi failed trying to change configuration. Reverting to 
> old configuration.
> 2021-09-21 12:54:27,425 INFO [main] o.apache.nifi.minifi.bootstrap.RunMiNiFi 
> Replacing config file with swap file and deleting swap file
> 2021-09-21 12:54:27,444 INFO [main] o.apache.nifi.minifi.bootstrap.RunMiNiFi 
> Successfully spawned the thread to start Apache MiNiFi with PID 64002
> 2021-09-21 12:54:29,384 INFO [MiNiFi Bootstrap Command Listener] 
> o.apache.nifi.minifi.bootstrap.RunMiNiFi The thread to run Apache MiNiFi is 
> now running and listening for Bootstrap requests on port 38889
>
>
>
> It cannot change the configuration flow because it cannot create Provenance 
> Repository, and then
> reverts to old configuration of the flow.
>
> I tried to delete all the files in ./provenance_repository folder, and start 
> it again, but the same happens.
>
>
>
> Does anybody know why is this?
>
>
>
> Thanks in advance,
>
> Regards,
>
> Tom


Re: Penalty feature of Processor (Disable)

2021-10-25 Thread Matt Burgess
The approach in #1 is already present in a few Put processors like
PutHive3QL, the property is named "Rollback on Failure" and takes a
boolean value. The docs explain that if set to false, the flowfile is
routed to failure, and if true will throw an exception and rollback
the session. Check RollbackOnFailure.java for more details.

Regards,
Matt

On Mon, Oct 25, 2021 at 4:46 PM Bryan Bende  wrote:
>
> The try/catch for IOException in LookupAttribute is after already
> calling session.get(), so it is separate from loading a flow file.
>
> The SimpleDatabaseLookupService catches SQLException and throws
> LookupFailureException which is the indicator to route to failure, and
> it lets IOException be thrown so that callers can decide what to do.
>
> Typically IOException would be considered retryable so the current
> behavior seems reasonable, but in this case the user wants to decide
> not to retry which currently can't be done.
>
> Seems like two options...
>
> 1) Introduce a new property like "Communication Error Strategy" with
> choices of "Rollback" (current) or "Route to Failure" (needed for this
> case).
>
> 2) Introduce a new relationship like "Retry" and instead of throwing
> ProcessException when catching IOException, instead route to Retry. It
> is then up to the user to decide if they want to connect Retry back to
> self to get the current behavior, auto-terminate it, or connect it to
> the next processor like this case wants to do.
>
>
> On Mon, Oct 25, 2021 at 4:01 PM Edward Armes  wrote:
> >
> > Hmm, it sounds like to me there might be 2 bugs here.
> >
> > One in the lookup attribute processor not isolating the loading of 
> > attributes from a FlowFile which may legitimately cause an IOException that 
> > would result in the FlowFile needing to be retired. The other in the 
> > TeradataDB lookup service not returning suitable errors that indicate if 
> > the issue is transient and a retry is needed or if it's a failure and 
> > should be routed to the failure queue.
> >
> > Edward
> >
> > On Mon, 25 Oct 2021, 16:50 Bryan Bende,  wrote:
> >>
> >> I'm not 100% sure on this, but I think the issue is that when 
> >> LookupAttribute calls the LookupService, it catches IOException and throws 
> >> a ProcessException, which rolls back the current session and puts the 
> >> incoming flow files back in the preceding queue. The idea is that it would 
> >> then retry the flow files until the comms issue is resolved, but in your 
> >> case you don't want that.
> >>
> >> I think there would need to be an enhancement to LookupAttribute that adds 
> >> a property to control the behavior on IOException so that the user can 
> >> decide between rollback vs route to failure.
> >>
> >> On Mon, Oct 25, 2021 at 11:29 AM Etienne Jouvin  
> >> wrote:
> >>>
> >>> Hello all.
> >>>
> >>> You can decrease the penalty value on the processor.
> >>> Set to 0 for example.
> >>>
> >>>
> >>>
> >>> Le lun. 25 oct. 2021 à 16:22, Bilal Bektas  a 
> >>> écrit :
> 
>  Hi Community,
> 
> 
> 
>  We use LookupAttribute processor in order to get lookup value from 
>  Teradata or Oracle DB. Processors work as follows:
> 
> 
> 
>  LookupAttribute (Teradata)  ---(failure & unmatched) ---> 
>  LookupAttribute (Oracle)
> 
> 
> 
>  This flows works well and LookupAttribute (Teradata) penalizes to flow 
>  files when Teradata DB is down. Therefore, the queue on upstream 
>  connection of LookupAttribute (Teradata) increases. But, we don't want 
>  to that LookupAttribute (Teradata) penalizes to flow files. We want to 
>  that LookupAttribute (Teradata) processor forwards flow files to failure 
>  downstream connection when all failure situation on LookupAttribute 
>  (Teradata). Thus, LookupAttribute (Oracle) can process flow files which 
>  cannot process on LookupAttribute (Teradata).
> 
> 
> 
>  Is it possible to disable penalty feature of processor or is there any 
>  solution which you can suggest for this situation.
> 
> 
> 
>  Thank you in advance,
> 
> 
> 
>  --Bilal
> 
>  obase
>  TEL: +90216 527 30 00
>  FAX: +90216 527 31 11
> 
>  Bu elektronik posta ve onunla iletilen bütün dosyalar sadece göndericisi 
>  tarafindan almasi amaclanan yetkili gercek ya da tüzel kisinin kullanimi 
>  icindir. Eger söz konusu yetkili alici degilseniz bu elektronik postanin 
>  icerigini aciklamaniz, kopyalamaniz, yönlendirmeniz ve kullanmaniz 
>  kesinlikle yasaktir ve bu elektronik postayi derhal silmeniz 
>  gerekmektedir. OBASE bu mesajin icerdigi bilgilerin doğruluğu veya 
>  eksiksiz oldugu konusunda herhangi bir garanti vermemektedir. Bu nedenle 
>  bu bilgilerin ne sekilde olursa olsun iceriginden, iletilmesinden, 
>  alinmasindan ve saklanmasindan sorumlu degildir. Bu mesajdaki görüsler 
>  yalnizca gönderen kisiye aittir ve OBASE görüslerini 

Re: Expression language within scripts in ExecuteScript

2021-07-19 Thread Matt Burgess
Jim,

You can apply Chris's solution to ExecuteScript itself, as you can add
dynamic properties there and they will evaluate the EL for you. If you
set a dynamic property "myNode" to "${ip()}", you should be able to
use myNode.evaluateAttributeExpressions().getValue().

Regards,
Matt

On Mon, Jul 19, 2021 at 5:56 AM James McMahon  wrote:
>
> Chris, thank you very much for suggesting a few alternatives. I'll tune my 
> flow this morning and give these a try, too. I think the path suggested by 
> Lars is going to prove ideal for my situation, where I want to enter my 
> ExecuteScript with one flowFile and create multiple flowFiles from that in a 
> loop, as I work my way through a series of directories in a python loop, 
> pulling metadata from them. Since I'll be running my ES in "all node" 
> configuration, if I'm on a four-node cluster my created flowFiles can be on 
> any of the cluster nodes. I want to identify the host node for each of the 
> created flowFiles within my script, where I need to apply that knowledge. 
> Lars suggested leveraging python rather than trying to figure out how to do 
> this programmatically through nifi session and flowFile objects.
> Cheers and thank very much once again,
> Jim
>
> On Mon, Jul 19, 2021 at 1:41 AM Chris Sampson  
> wrote:
>>
>> If you're using GenerateFlowFile, you could add a dynamic property to set an 
>> attribute using the ${hostname} (or ip) on the created FlowFile.
>>
>> If you're only running Generate on a single node before load balancing the 
>> FlowFile (e.g. round-robin) and the expression needs to be calculated on the 
>> destination host, you could use an UpdateAttribute to create the attribute 
>> before ExecuteScript but after the load balancing.
>>
>> Of course, this doesn't answer your original question about evaluating EL 
>> within a script, but gives you a couple of alternate options.
>>
>> Cheers,
>>
>> Chris Sampson
>>
>> On Sun, 18 Jul 2021, 12:46 James McMahon,  wrote:
>>>
>>> Lars, thank you so much. I was fixated on getting the host information from 
>>> the flowFile itself and overlooked the obvious. Your suggestion will be 
>>> ideal. Thank you once again!
>>> Jim
>>>
>>> On Sun, Jul 18, 2021 at 12:59 AM Lars Winderling 
>>>  wrote:

 James,

 maybe just use?
 Import socket
 socket.gethostname()
 It will give you rather the hostname, but that should also help for 
 distinguishing between nodes.

 Best, Lars

 On 17 July 2021 22:25:47 CEST, James McMahon  wrote:
>
> Looking closely at the flowFiles I am creating,in the subsequent output 
> queue, I see they have a Node Address listed in FlowFile Details. It is 
> not listed in the flowfile attributes.
> That is what I need to get at programmatically in my python script. How 
> can I access Node Address?
>
> On Sat, Jul 17, 2021 at 2:59 PM James McMahon  
> wrote:
>>
>> I have a single flowfile I generate on a periodic basis using a cron 
>> scheduled GenerateFlowFile. This then flows into an ExecuteScript, where 
>> I have a python script that will create multiple flowfiles from the one.
>>
>> My ExecuteScript is configured to run on all my cluster nodes. For each 
>> instance of flowfile I am creating, I need to determine which cluster 
>> node it associates with. There’s an expression language function called 
>> ip(). Can anyone tell me how to employ ${ip()} in my python to determine 
>> the cluster node the newly created flowFile is associated with?
>>
>> I’d be using this after I execute
>> flowfile = session.create()
>>
>> Thanks in advance for your help.


Re: NiFi Queue Monitoring

2021-07-21 Thread Matt Burgess
Scott,

Glad to hear it! Please let me know if you have any questions or if
issues arise. One thing I forgot to mention is that I think
backpressure prediction is disabled by default due to the extra
consumption of CPU to do the regressions, make sure the
"nifi.analytics.predict.enabled" property in nifi.properties is set to
"true" before starting NiFi.

Regards,
Matt

On Wed, Jul 21, 2021 at 7:21 PM scott  wrote:
>
> Excellent! Very much appreciate the help and for setting me on the right 
> path. I'll give the queryNiFiReportingTask code a try.
>
> Scott
>
> On Wed, Jul 21, 2021 at 3:26 PM Matt Burgess  wrote:
>>
>> Scott et al,
>>
>> There are a number of options for monitoring flows, including
>> backpressure and even backpressure prediction:
>>
>> 1) The REST API for metrics. As you point out, it's subject to the
>> same authz/authn as any other NiFi operation and doesn't sound like it
>> will work out for you.
>> 2) The Prometheus scrape target via the REST API. The issue would be
>> the same as #1 I presume.
>> 3) PrometheusReportingTask. This is similar to the REST scrape target
>> but isn't subject to the usual NiFi authz/authn stuff, however it does
>> support SSL/TLS for a secure solution (and is also a "pull" approach
>> despite it being a reporting task)
>> 4) QueryNiFiReportingTask. This is not included with the NiFi
>> distribution but can be downloaded separately, the latest version
>> (1.14.0) is at [1]. I believe this is what Andrew was referring to
>> when he mentioned being able to run SQL queries over the information,
>> you can do something like "SELECT * FROM CONNECTION_STATUS_PREDICTIONS
>> WHERE predictedTimeToBytesBackpressureMillis < 1". This can be
>> done either as a push or pull depending on the Record Sink you choose.
>> A SiteToSiteReportingRecordSink, KafkaRecordSink, or LoggingRecordSink
>> results in a push (to NiFi, Kafka, or nifi-app.log respectively),
>> where a PrometheusRecordSink results in a pull the same as #2 and #3.
>> There's even a ScriptedRecordSink where you can write your own script
>> to put the results where you want them.
>> 5) The other reporting tasks. These have been mentioned frequently in
>> this thread so no need for elaboration here :)
>>
>> Regards,
>> Matt
>>
>> [1] 
>> https://repository.apache.org/content/repositories/releases/org/apache/nifi/nifi-sql-reporting-nar/1.14.0/
>>
>> On Wed, Jul 21, 2021 at 5:58 PM scott  wrote:
>> >
>> > Great comments all. I agree with the architecture comment about push 
>> > monitoring. I've been monitoring applications for more than 2 decades now, 
>> > but sometimes you have to work around the limitations of the situation. It 
>> > would be really nice if NiFi had this logic built-in, and frankly I'm 
>> > surprised it is not yet. I can't be the only one who has had to deal with 
>> > queues filling up, causing problems downstream. NiFi certainly knows that 
>> > the queues fill up, they change color and execute back-pressure logic. If 
>> > it would just do something simple like write a log/error message to a log 
>> > file when this happens, I would be good.
>> > I have looked at the new metrics and reporting tasks but still haven't 
>> > found the right thing to do to get notified when any queue in my instance 
>> > fills up. Are there any examples of using them for a similar task you can 
>> > share?
>> >
>> > Thanks,
>> > Scott
>> >
>> > On Wed, Jul 21, 2021 at 11:29 AM u...@moosheimer.com  
>> > wrote:
>> >>
>> >> In general, it is a bad architecture to do monitoring via pull request. 
>> >> You should always push. I recommend a look at the book "The Art of 
>> >> Monitoring" by James Turnbull.
>> >>
>> >> I also recommend the very good articles by Pierre Villard on the subject 
>> >> of NiFi monitoring at 
>> >> https://pierrevillard.com/2017/05/11/monitoring-nifi-introduction/.
>> >>
>> >> Hope this helps.
>> >>
>> >> Mit freundlichen Grüßen / best regards
>> >> Kay-Uwe Moosheimer
>> >>
>> >> Am 21.07.2021 um 16:45 schrieb Andrew Grande :
>> >>
>> >> 
>> >> Can't you leverage some of the recent nifi features and basically run sql 
>> >> queries over NiFi metrics directly as part of the flow? Then act on it 
>> >> with a full flexibility of the flow. Kinda like a push design.
>> >>
>> >> Andrew
>> >>
>> >> On Tue, Jul 20, 2021, 2:31 PM scott  wrote:
>> >>>
>> >>> Hi all,
>> >>> I'm trying to setup some monitoring of all queues in my NiFi instance, 
>> >>> to catch before queues become full. One solution I am looking at is to 
>> >>> use the API, but because I have a secure NiFi that uses LDAP, it seems 
>> >>> to require a token that expires in 24 hours or so. I need this to be an 
>> >>> automated solution, so that is not going to work. Has anyone else 
>> >>> tackled this problem with a secure LDAP enabled cluster?
>> >>>
>> >>> Thanks,
>> >>> Scott


Re: NiFi Queue Monitoring

2021-07-21 Thread Matt Burgess
Scott et al,

There are a number of options for monitoring flows, including
backpressure and even backpressure prediction:

1) The REST API for metrics. As you point out, it's subject to the
same authz/authn as any other NiFi operation and doesn't sound like it
will work out for you.
2) The Prometheus scrape target via the REST API. The issue would be
the same as #1 I presume.
3) PrometheusReportingTask. This is similar to the REST scrape target
but isn't subject to the usual NiFi authz/authn stuff, however it does
support SSL/TLS for a secure solution (and is also a "pull" approach
despite it being a reporting task)
4) QueryNiFiReportingTask. This is not included with the NiFi
distribution but can be downloaded separately, the latest version
(1.14.0) is at [1]. I believe this is what Andrew was referring to
when he mentioned being able to run SQL queries over the information,
you can do something like "SELECT * FROM CONNECTION_STATUS_PREDICTIONS
WHERE predictedTimeToBytesBackpressureMillis < 1". This can be
done either as a push or pull depending on the Record Sink you choose.
A SiteToSiteReportingRecordSink, KafkaRecordSink, or LoggingRecordSink
results in a push (to NiFi, Kafka, or nifi-app.log respectively),
where a PrometheusRecordSink results in a pull the same as #2 and #3.
There's even a ScriptedRecordSink where you can write your own script
to put the results where you want them.
5) The other reporting tasks. These have been mentioned frequently in
this thread so no need for elaboration here :)

Regards,
Matt

[1] 
https://repository.apache.org/content/repositories/releases/org/apache/nifi/nifi-sql-reporting-nar/1.14.0/

On Wed, Jul 21, 2021 at 5:58 PM scott  wrote:
>
> Great comments all. I agree with the architecture comment about push 
> monitoring. I've been monitoring applications for more than 2 decades now, 
> but sometimes you have to work around the limitations of the situation. It 
> would be really nice if NiFi had this logic built-in, and frankly I'm 
> surprised it is not yet. I can't be the only one who has had to deal with 
> queues filling up, causing problems downstream. NiFi certainly knows that the 
> queues fill up, they change color and execute back-pressure logic. If it 
> would just do something simple like write a log/error message to a log file 
> when this happens, I would be good.
> I have looked at the new metrics and reporting tasks but still haven't found 
> the right thing to do to get notified when any queue in my instance fills up. 
> Are there any examples of using them for a similar task you can share?
>
> Thanks,
> Scott
>
> On Wed, Jul 21, 2021 at 11:29 AM u...@moosheimer.com  
> wrote:
>>
>> In general, it is a bad architecture to do monitoring via pull request. You 
>> should always push. I recommend a look at the book "The Art of Monitoring" 
>> by James Turnbull.
>>
>> I also recommend the very good articles by Pierre Villard on the subject of 
>> NiFi monitoring at 
>> https://pierrevillard.com/2017/05/11/monitoring-nifi-introduction/.
>>
>> Hope this helps.
>>
>> Mit freundlichen Grüßen / best regards
>> Kay-Uwe Moosheimer
>>
>> Am 21.07.2021 um 16:45 schrieb Andrew Grande :
>>
>> 
>> Can't you leverage some of the recent nifi features and basically run sql 
>> queries over NiFi metrics directly as part of the flow? Then act on it with 
>> a full flexibility of the flow. Kinda like a push design.
>>
>> Andrew
>>
>> On Tue, Jul 20, 2021, 2:31 PM scott  wrote:
>>>
>>> Hi all,
>>> I'm trying to setup some monitoring of all queues in my NiFi instance, to 
>>> catch before queues become full. One solution I am looking at is to use the 
>>> API, but because I have a secure NiFi that uses LDAP, it seems to require a 
>>> token that expires in 24 hours or so. I need this to be an automated 
>>> solution, so that is not going to work. Has anyone else tackled this 
>>> problem with a secure LDAP enabled cluster?
>>>
>>> Thanks,
>>> Scott


Re: Persisting a logged message to the bulletin board

2021-07-22 Thread Matt Burgess
Jim,

Are you doing the whole series of directories in one call to
onTrigger? If so you could keep getting the current time and if you
haven't switched directories then you could reissue the bulletin if
the elapsed time > 5 mins, then reset the variable to determine the
next elapsed time.

Regards,
Matt

On Thu, Jul 22, 2021 at 1:55 PM James McMahon  wrote:
>
> I'm using a python script from an ExecuteScript processor to work through a 
> series of directories. As I begin processing a directory, I log a message to 
> the bulletin board using log.info(myMessage).
>
> After five minutes, that message expires and is no longer visible. However I 
> work within each directory for quite some time and want to persist this 
> message as a bulletin until I move to the next directory. How can I force a 
> log message to repost with every nifi five minute bulletin cycle? Thanks in 
> advance.
> Jim


Re: NiFi Queue Monitoring

2021-07-27 Thread Matt Burgess
I’m planning on doing one all about QueryNiFiReportingTask and the RecordSinks, 
I can include this use case if you like, but would definitely encourage you to 
blog it as well :) my blog is at https://funnifi.blogspot.com as an example, 
there are many others as well.

Regards,
Matt

> On Jul 27, 2021, at 5:17 PM, scott  wrote:
> 
> 
> Joe,
> I'm not sure. What would be involved? I'm not familiar with a NiFi blog, can 
> you point me to some examples?
> 
> Thanks,
> Scott
> 
>> On Tue, Jul 27, 2021 at 10:00 AM Joe Witt  wrote:
>> Scott
>> 
>> This sounds pretty darn cool.  Any chance you'd be interested in
>> kicking out a blog on it?
>> 
>> Thanks
>> 
>> On Tue, Jul 27, 2021 at 9:58 AM scott  wrote:
>> >
>> > Matt/all,
>> > I was able to solve my problem using the QueryNiFiReportingTask with 
>> > "SELECT * FROM CONNECTION_STATUS WHERE isBackPressureEnabled = true" and 
>> > the new LoggingRecordSink as you suggested. Everything is working 
>> > flawlessly now. Thank you again!
>> >
>> > Scott
>> >
>> > On Wed, Jul 21, 2021 at 5:09 PM Matt Burgess  wrote:
>> >>
>> >> Scott,
>> >>
>> >> Glad to hear it! Please let me know if you have any questions or if
>> >> issues arise. One thing I forgot to mention is that I think
>> >> backpressure prediction is disabled by default due to the extra
>> >> consumption of CPU to do the regressions, make sure the
>> >> "nifi.analytics.predict.enabled" property in nifi.properties is set to
>> >> "true" before starting NiFi.
>> >>
>> >> Regards,
>> >> Matt
>> >>
>> >> On Wed, Jul 21, 2021 at 7:21 PM scott  wrote:
>> >> >
>> >> > Excellent! Very much appreciate the help and for setting me on the 
>> >> > right path. I'll give the queryNiFiReportingTask code a try.
>> >> >
>> >> > Scott
>> >> >
>> >> > On Wed, Jul 21, 2021 at 3:26 PM Matt Burgess  
>> >> > wrote:
>> >> >>
>> >> >> Scott et al,
>> >> >>
>> >> >> There are a number of options for monitoring flows, including
>> >> >> backpressure and even backpressure prediction:
>> >> >>
>> >> >> 1) The REST API for metrics. As you point out, it's subject to the
>> >> >> same authz/authn as any other NiFi operation and doesn't sound like it
>> >> >> will work out for you.
>> >> >> 2) The Prometheus scrape target via the REST API. The issue would be
>> >> >> the same as #1 I presume.
>> >> >> 3) PrometheusReportingTask. This is similar to the REST scrape target
>> >> >> but isn't subject to the usual NiFi authz/authn stuff, however it does
>> >> >> support SSL/TLS for a secure solution (and is also a "pull" approach
>> >> >> despite it being a reporting task)
>> >> >> 4) QueryNiFiReportingTask. This is not included with the NiFi
>> >> >> distribution but can be downloaded separately, the latest version
>> >> >> (1.14.0) is at [1]. I believe this is what Andrew was referring to
>> >> >> when he mentioned being able to run SQL queries over the information,
>> >> >> you can do something like "SELECT * FROM CONNECTION_STATUS_PREDICTIONS
>> >> >> WHERE predictedTimeToBytesBackpressureMillis < 1". This can be
>> >> >> done either as a push or pull depending on the Record Sink you choose.
>> >> >> A SiteToSiteReportingRecordSink, KafkaRecordSink, or LoggingRecordSink
>> >> >> results in a push (to NiFi, Kafka, or nifi-app.log respectively),
>> >> >> where a PrometheusRecordSink results in a pull the same as #2 and #3.
>> >> >> There's even a ScriptedRecordSink where you can write your own script
>> >> >> to put the results where you want them.
>> >> >> 5) The other reporting tasks. These have been mentioned frequently in
>> >> >> this thread so no need for elaboration here :)
>> >> >>
>> >> >> Regards,
>> >> >> Matt
>> >> >>
>> >> >> [1] 
>> >> >> https://repository.apache.org/content/repositories/releases/org/apache/nifi/nifi-sql-reporting-nar/1.14.0/
>> >> >>
>> >> >&g

Re: PutSQL in combination with ConvertJSONToSQL gives java.sql.SQLException: Invalid column type for Orcale DataType BINARY_DOUBLE

2022-01-20 Thread Matt Burgess
Sven,

This is a recently discovered bug, I am still working on
characterizing the issue before writing a Jira to describe it.
NIFI-9169 [1] has the same cause but is a slightly different issue. So
far the issue seems to be with using update key(s) with "Quote
Identifiers" set to true. Setting it to false (if possible for your
flow) should allow it to work.

Regards,
Matt

[1] https://issues.apache.org/jira/browse/NIFI-9169

On Thu, Jan 20, 2022 at 11:15 AM Sven Ritter  wrote:
>
> Hi Mark,
>
>
>
> Thanks for the hint with the PutDatabaseRecord processor.
>
> I tried it. For INSERTS it works fine, but the UPDATE is failing with:  
> “Routing to failure.: ORA-00936: missing expression”.
>
> When I enable DEBUG and force the processor to write out the SQL query (by 
> setting Maximum Batch Size = 1), I get the following:
>
>
>
> PutDatabaseRecord[id=cc8716d2-3adc-1ca6-e173-69c231e48341] Executing query 
> UPDATE "FIRST_LEVEL" SET "S_START_TIME" = ?, "S_ID_KEY" = ?, 
> "S_STATUS_MESSAGE" = ?, "S_STATUS" = ? WHERE  because batch reached max size 
> for 
> StandardFlowFileRecord[uuid=e4389b22-b9a6-4632-9321-96e56285f1d7,claim=StandardContentClaim
>  [resourceClaim=StandardResourceClaim[id=1642609386401-6, container=default, 
> section=6], offset=77170, 
> length=105],offset=0,name=dd5e6bae-9a5f-40e6-826d-d8d205a3154b,size=105]; 
> fieldIndexes: [0, 1, 2, 3]; batch index: 1; batch size: 1
>
>
>
> It seems that “Update Keys” (S_ID_KEY) is not recognized by the processor, 
> because S_ID_KEY is in the UPDATE itself and the WHERE condition is empty.
>
> What did I do wrong?
>
>
>
> Best Regards
>
> Sven
>
>


Re: initial NiFi 1.15.3 start-up issue on mongo-uri sensitive property complaint

2022-01-26 Thread Matt Burgess
Jeremy,

I can't reproduce this on the latest main branch (closest to 1.15.3).
What's weird about that error message is that it says 'mongo-uri' is a
sensitive property, but it is not.  I set up a Parameter Context (PC)
with a non-sensitive parameter "muri", set the PC on the root Process
Group (where the Controller Service (CS) is defined), I didn't get any
errors about sensitivity. Then I restarted NiFi and everything worked
as expected.

Just to see what happens I tried it the other way, with "muri" being
sensitive. I then got the expected error in the UI that the
sensitivity of the property doesn't match the sensitivity of the
parameter. I then stopped NiFi, edited the flow.json, and restarted.
Instead of crashing, the CS was marked invalid with the aforementioned
message about mismatched sensitivity.

Does this only happen with your own Mongo processors? Is the Mongo URI
property sensitive in your component(s)?

Regards,
Matt


Re: NiFi hanging during large sql query

2023-09-02 Thread Matt Burgess
When you said "fetchSize set low", I assume you mean non-zero, a zero
will fetch all the rows at once. How did you paginate your query with
ExecuteSQLRecord? I was going to suggest GenerateTableFetch in front
to paginate the queries for you, but it definitely seems like we
should be able to do or configure something that will make PG happy.

Regards,
Matt

On Sat, Sep 2, 2023 at 11:36 AM Mark Payne  wrote:
>
> Thanks for sharing the solution Mike. Is there something we need to update in 
> nifi to prevent this from biting others?
>
> Thanks
> Mark
>
> Sent from my iPhone
>
> On Sep 2, 2023, at 9:48 AM, Joe Witt  wrote:
>
> 
> Nice.  Gald you found it.
>
> On Sat, Sep 2, 2023 at 5:07 AM Mike Thomsen  wrote:
>>
>> It was the PostgreSQL JDBC driver. If you don't paginate the query 
>> aggressively, it will try to load a significant chunk of the table into 
>> memory rather than just pulling chunks, even with fetchSize set low.
>>
>> On Fri, Sep 1, 2023 at 6:01 PM Mike Thomsen  wrote:
>>>
>>> I have a three node cluster with an executesqlrecord processor with primary 
>>> execution only. The sql it runs is a straight forward select on a table 
>>> with about 44m records. If I leave it running, after about 10 min the node 
>>> becomes unresponsive and leaves the cluster. The query runs just fine in 
>>> jetbrains data grip on that postgresql server, so I don’t think it’s 
>>> anything weird with the db or query. Any ideas about what could be causing 
>>> this? Even with a high limit like 5m records the query doesn’t lock up the 
>>> NiFi node.
>>>
>>> Sent from my iPhone


Re: Files in /tmp folder

2023-08-31 Thread Matt Burgess
Can you share your conf/state-management.xml contents?

On Mon, Aug 28, 2023 at 8:33 AM Williams, Van 
wrote:

> There are files that are appearing in the /tmp folder on some of our NiFi
> Linux hosts. The files all begin with 'file', and they somewhat quickly
> fill up that folder (we have an automated job to keep that clear while we
> investigate the issue). This started happening within the last 2-3 months.
> I'm guessing that there may have been a .conf or a .properties files
> change, but cannot tell for sure.
>
> We have tried stopping/disabling all processes in NiFi, but the files are
> still appearing. We have also stopped NiFi entirely, and that did stop the
> files from being generated, but the files began to be generated again when
> NiFi was restarted.
>
> The content of the files are all the same, and have some text from SQLite
> within them:
>
> tablekeyskeys^DCREATE TABLE keys (label_id INTEGER primary key
> AUTOINCREMENT,label TEXT unique,value TEXT,type INTEGER,lcrypto
> INTEGER,vcrypto INTEGER)'^D^F^W;^U^A^@indexsqlite_autoindex_keys_1keys
>
> Any ideas?
>
>
>
>
>
> *Van Williams*
>
> Lead BizOps Engineer
>
>
>
> Mastercard | mobile 314-691-3047
>
> 
>
>
> CONFIDENTIALITY NOTICE This e-mail message and any attachments are only
> for the use of the intended recipient and may contain information that is
> privileged, confidential or exempt from disclosure under applicable law. If
> you are not the intended recipient, any disclosure, distribution or other
> use of this e-mail message or attachments is prohibited. If you have
> received this e-mail message in error, please delete and notify the sender
> immediately. Thank you.
>


Re: executescriipt and jython logging

2023-08-22 Thread Matt Burgess
Richard,

I'll look into the logging stuff, definitely strange behavior. Are you
using ExecuteScript or InvokeScriptedProcessor? I noticed an
intermittent failure on InvokeScriptedProcessor in 1.20 [1] but maybe
it's present in more of the scripting stuff as they use the same
classes to handle the engines and scripts.

Regards,
Matt

[1] https://issues.apache.org/jira/browse/NIFI-11148

On Tue, Aug 22, 2023 at 7:19 AM Richard Beare  wrote:
>
> It seems stranger than I thought. I have the following in the callback:
>
> log.info("Simple text")
> log.info("Block size " + str(type(block_of_days)) + "\n")
> log.info("Oldest time: " + oldest_time_str)
> log.info("Newest time: " + newest_time_str)
> log.info("Previous newest: " + 
> iso2str.format(previous_last_timestamp))
>
> The last 3 (from "Oldest time") and another later info message which is a 
> pure string appear in the log, but the first two don't.
>
>
> On Tue, Aug 22, 2023 at 8:36 PM Richard Beare  wrote:
>>
>> Hi all,
>> I'm having issues with logging from python. I have the processor set to 
>> debug level logging but I only see message from inside the callback;
>>
>> i.e
>>
>> flowFile = session.write(flowFile, PyStreamCallback())
>>
>> calls to log.error and log.info from inside the process method of 
>> PyStreamCallback() work fine.
>>
>> However calls to log.error, log.warn etc from inside the typical "if 
>> flowFile != None" block are not appearing as bulletins. They don't cause 
>> errors either. What am I missing?
>>
>> As an aside, I also seem to have trouble with changes to the script being 
>> registered - that seems intermittent.
>>
>> I'm using nifi 1.20
>>
>> Thanks


Re: NiFi to draw samples from very large raw data sets

2022-05-19 Thread Matt Burgess
If you have large FlowFiles and are trying to sample records from
each, you can use SampleRecord. It has Interval Sampling,
Probabilistic Sampling, and Reservoir Sampling strategies, and I have
a PR [1] up to add Range Sampling [2].

Regards,
Matt

[1] https://github.com/apache/nifi/pull/5878
[2] https://issues.apache.org/jira/browse/NIFI-9814

On Thu, May 19, 2022 at 6:20 AM James McMahon  wrote:
>
> I have been tasked to draw samples from very large raw data sets for triage 
> analysis. I am to provide multiple sampling methods. Drawing a random sample 
> of N records is one method. A second method is to draw a fixed sample of 
> 1,032 records from stratified defined date boundaries in a set. The latter is 
> of interest because raw data can substantially change structure or even 
> format at points in time, and we need to be able to sample within those data 
> boundaries.
>
> Can anyone offer a link to an example of how nifi may be used to draw samples 
> randomly and/or in a systematic way from raw data collections?


Re: nifi-content-viewer with mime.type of image/webp

2022-05-23 Thread Matt Burgess
It didn't display in NiFi until NIFI-10027 [1], which has recently
been merged. It will be in the upcoming 1.17.0 release (or perhaps a
1.16.3 if the current RC is not released).

Regards,
Matt

[1] https://issues.apache.org/jira/browse/NIFI-10027

On Mon, May 23, 2022 at 11:46 AM Ryan Hendrickson
 wrote:
>
> > "Note that as NiFi uses a browser, the image(s) will only be displayed if 
> > the browser supports WebP" - Matt B. in ticket.
>
> Is that the case?  I can open a WebP image in Chrome, but if I read it in 
> from disk, set the mime.type correctly, it will not display in NiFi.
>
> On Sat, May 14, 2022 at 2:16 AM Ryan Hendrickson 
>  wrote:
>>
>> https://issues.apache.org/jira/browse/NIFI-10027   Add image/webp support to 
>> the content viewer
>>
>> WebP is a modern image format that provides superior lossless and lossy 
>> compression for images on the web. Using WebP, webmasters and web developers 
>> can create smaller, richer images that make the web faster.
>>
>> WebP lossless images are 26% smaller in size compared to PNGs. WebP lossy 
>> images are 25-34% smaller than comparable JPEG images at equivalent SSIM 
>> quality index.
>>
>> Lossless WebP supports transparency (also known as alpha channel) at a cost 
>> of just 22% additional bytes. For cases when lossy RGB compression is 
>> acceptable, lossy WebP also supports transparency, typically providing 3× 
>> smaller file sizes compared to PNG.
>>
>> On Thu, May 12, 2022 at 12:28 PM Joe Witt  wrote:
>>>
>>> Ryan.
>>>
>>> Nope - just means we dont have a viewer for that content type.  Would need 
>>> to be added.
>>>
>>> Thanks
>>>
>>> On Thu, May 12, 2022 at 9:19 AM Ryan Hendrickson 
>>>  wrote:

 Hi all,
 I'm curious if anyone has used the nifi-content-viewer to view images with 
 a mime.type of "image/webp".

 When I load up an image with that mime.type it says "No viewer is 
 registered for this content type"

 https://developers.google.com/speed/webp

 Thanks,
 Ryan


Re: Most memory-efficient way in NiFi to fetch an entire RDBMS table?

2022-06-22 Thread Matt Burgess
Mike,

I recommend QueryDatabaseTableRecord with judicious choices for the
following properties:

Fetch Size: This should be tuned to return the most number of rows
without causing network issues such as timeouts. Can be set to the
same value as Max Rows Per Flow File ensuring one fetch per outgoing
FlowFile
Max Rows Per Flow File: This should be set to a reasonable number of
rows per FlowFile, maybe 100K or even 1M if that doesn't cause issues
(see above)
Output Batch Size: This is the key to doing full selects on huge
tables, as it allows FlowFiles to be committed to the session and
passed downstream while the rest of the fetch is being processed. In
your case if you set Max Rows to 100K then this could be 10, or if you
set it to 1M it could be 1. Note that with this property set, the
maxvalue.* and fragment.count attributes will not be set on these
FlowFiles, so you can't merge them.  I believe the maxvalue state will
still be updated even if this property is used, so it should turn into
an incremental fetch after the first full fetch is complete.

Regards,
Matt

On Wed, Jun 22, 2022 at 10:00 AM Mike Thomsen  wrote:
>
> We have a table with 68M records that will blow up to over 250M soon,
> and need to do a full table fetch on it. What's the best practice for
> efficiently doing a partial or full select on it?
>
> Thanks,
>
> Mike


Re: Minifi and ssl config on NiFi

2022-04-17 Thread Matt Burgess
MiNiFi is actually alive and well, we just moved it into the NiFi codebase. 
We’re actively developing a Command-and-Control (C2) capability to remotely 
update the flow on the agent for example.

You can configure MiNiFi agents for SSL over Site-to-Site in order to talk to 
secure NiFi instances. Not sure about the need for a user but you would need to 
present a certificate the same as you would for connecting to the NiFi UI. Some 
security features still need to be implemented (like encrypted properties 
maybe) but you should definitely be able to do what you’re trying to do with 
MiNiFi, happy to help with any issues you may run into.

Regards,
Matt


> On Apr 17, 2022, at 11:40 AM, Jorge Machado  wrote:
> 
> I did this on the pass and I end up switching to Nifi. I think you should do 
> the same. Minifi is kind of “Dead” not being developed anymore. I found 
> better to just switch to single instance of nifi 
> 
> Regards
> Jorge 
> 
>> On 17. Apr 2022, at 03:30, David Early  wrote:
>> 
>> We are considering using several dozen minifi instances to gather data at 
>> remote locations and send it to a cloud based central NiFi.
>> 
>> The problem I am THINKING we will have is setting up ssl. The only way I 
>> know of to set up ssl for site to site requires a user be configured for the 
>> incoming data on the destination NiFi and permissions given to that user to 
>> be able to send data.
>> 
>> Am I missing something? Will we have to manually set up a user in the cloud 
>> NiFi for each minifi instances so we can use ssl transport?
>> 
>> Dave
> 


Re: Crash on startup due to Output port issue

2022-08-01 Thread Matt Burgess
Benji,

I've built a custom framework NAR [1] that has additional logging to
identify which components, process groups, etc. are causing the issue.
If you'd like, please feel free to save off your existing framework
NAR and replace it with this, then share the relevant section of the
log (matching the IDs to the components in your flow since it's
sensitive and can't be shared). Depending on the structure of your
flow (is it possible to share relevant screenshots?) that might help
us identify, reproduce, and/or fix the issue.

I filed a Jira to add more logging [2], I'll take this patch and
create a PR for it.

Regards,
Matt

[1] 
https://drive.google.com/file/d/1f5xs0S2HPWIjTGKRuhCm6_LmxQ5XnbeD/view?usp=sharing
[2] https://issues.apache.org/jira/browse/NIFI-10306

On Mon, Aug 1, 2022 at 12:46 PM BeNJ  wrote:
>
> Updates:
> Mark: Deleting the json allows nifi to start up(!!) - note that restarting 
> (with the newly generated json in place) causes the same issue on startup.
> Joe: I'm happy to run custom debug nars to help figure out how to resolve 
> this.
>
> Thank you, I really appreciate everyone's help with this!
> Benji
>
>
>
> On Mon, Aug 1, 2022 at 8:00 AM Joe Witt  wrote:
>>
>> Ben
>>
>> I didn't see any log levels that would help narrow this down.  If Mark's 
>> suggestion does not address it it will likely require updating the flow 
>> configuration manually or a patched framework nar to do better logging.
>>
>> I'm also wondering if the json form of the flow could be imported on a clean 
>> nifi (you'd have to re-enter sensitive values) or imported to a registry 
>> then imported to nifi.
>>
>> Given you have hit this and someone else did (on stackoverflow) we clearly 
>> have an important case to figure out here as it is obviously disruptive.
>>
>> Thanks
>>
>> On Mon, Aug 1, 2022 at 5:41 AM Mark Payne  wrote:
>>>
>>> Benji,
>>>
>>> I would recommend you try to remove (or rename) the flow.json.gz - but not 
>>> the flow.xml.gz. See if that makes any difference.
>>>
>>> Thanks
>>> -Mark
>>>
>>> Sent from my iPhone
>>>
>>> On Jul 31, 2022, at 11:35 PM, BeNJ  wrote:
>>>
>>> 
>>> Also please see the attached nifi.properties
>>>
>>> Thanks,
>>> Benji
>>>
>>> On Sun, Jul 31, 2022 at 4:28 PM BeNJ  wrote:

 Hi Joe,
 Stack with a couple of info logs from before and after, and the final exit 
 shortly after.
 --
 2022-07-31 16:20:35,311 INFO [main] 
 o.a.n.g.StandardProcessGroupSynchronizer Added 
 Connection[ID=cfee198f-3d2b-1513-f741-e71ad122, Source 
 ID=cfee198e-3d2b-1513-7a9c-f0c2a8cf0d43, Dest 
 ID=cfee19cf-3d2b-1513-4a87-5f50a90fdabf] to 
 StandardProcessGroup[identifier=cfee1961-3d2b-1513-c8a6-fdf1a8fe4ff5,name=Add
  Customer User]
 2022-07-31 16:20:35,311 INFO [main] 
 o.a.n.g.StandardProcessGroupSynchronizer Added 
 Connection[ID=cfee1974-3d2b-1513-e4df-9dbba1241682, Source 
 ID=cfee1971-3d2b-1513-555c-1aedf0f0801f, Dest 
 ID=cfee1970-3d2b-1513-de1d-f5bee9ad679e] to 
 StandardProcessGroup[identifier=cfee1961-3d2b-1513-c8a6-fdf1a8fe4ff5,name=Add
  Customer User]
 2022-07-31 16:20:35,317 INFO [Timer-Driven Process Thread-9] 
 o.a.n.c.s.StandardControllerServiceNode Successfully enabled 
 StandardControllerServiceNode[service=StandardOauth2AccessTokenProvider[id=cfee1b5b-3d2b-1513-7124-85b028901ac8],
  name=customer user management idp, active=true]
 2022-07-31 16:20:35,325 INFO [main] o.a.n.c.s.AffectedComponentSet 
 Starting the following components: AffectedComponentSet[inputPorts=[], 
 outputPorts=[], remoteInputPorts=[], remoteOutputPorts=[], processors=[], 
 controllerServices=[], reportingTasks=[]]
 2022-07-31 16:20:35,328 WARN [main] org.eclipse.jetty.webapp.WebAppContext 
 Failed startup of context 
 o.e.j.w.WebAppContext@ffaaaf0{nifi-api,/nifi-api,file:///opt/nifi/nifi-current/work/jetty/nifi-web-api-1.16.0.war/webapp/,UNAVAILABLE}{./work/nar/extensions/nifi-server-nar-1.16.0.nar-unpacked/NAR-INF/bundled-dependencies/nifi-web-api-1.16.0.war}
 org.apache.nifi.controller.serialization.FlowSynchronizationException: 
 java.lang.IllegalStateException: Cannot add Connection to Process Group 
 because source is an Output Port that does not belong to a child Process 
 Group
 at 
 org.apache.nifi.controller.serialization.VersionedFlowSynchronizer.synchronizeFlow(VersionedFlowSynchronizer.java:362)
 at 
 org.apache.nifi.controller.serialization.VersionedFlowSynchronizer.sync(VersionedFlowSynchronizer.java:185)
 at 
 org.apache.nifi.controller.serialization.StandardFlowSynchronizer.sync(StandardFlowSynchronizer.java:43)
 at 
 org.apache.nifi.controller.FlowController.synchronize(FlowController.java:1479)
 at 
 org.apache.nifi.persistence.StandardFlowConfigurationDAO.load(StandardFlowConfigurationDAO.java:104)
 at 
 

Re: PutSQL - BatchUpdateException: invalid arguments in call

2022-08-17 Thread Matt Burgess
Sergio,

Your email says the flowfiles each contain a record to insert, but
PutSQL takes a full SQL statement such as INSERT INTO tableName VALUES
('hello', 'world', 1). If you have a record of data rather than a SQL
statement, you can use PutDatabaseRecord for that instead. If you do
have SQL statements in your flowfile, can you share an example?

Thanks,
Matt

On Tue, Aug 9, 2022 at 4:12 PM Sergio M.  wrote:
>
> Hi Community,
>
> We are facing an error with the PutSQL processor while inserting a batch of 
> 20 flowfiles per transaction. Each flowfile contains a record to insert.
>
> Quite often, we get an error that it couldn't insert a record into the 
> database, so the flowfile exits on Retry, but when it retries, it succeeds.
>
>
> The error we get from Nifi is the following:
>
> 2022-08-09 12:23:15,803 ERROR [Timer-Driven Process Thread-24] 
> o.apache.nifi.processors.standard.PutSQL 
> PutSQL[id=aae45348-0181-1000--aaa08fa0] Failed to update database due 
> to a failed batch update, java.sql.BatchUpdateException: invalid arguments in 
> call. There were a total of 1 FlowFiles that failed, 0 that succeeded, and 1 
> that were not execute and will be routed to retry; : 
> java.sql.BatchUpdateException: invalid arguments in call
> java.sql.BatchUpdateException: invalid arguments in call
> at 
> oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10345)
> at 
> oracle.jdbc.driver.OraclePreparedStatement.executeBatchWithoutQueue(OraclePreparedStatement.java:10107)
> at 
> oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:9987)
> at 
> oracle.jdbc.driver.OraclePreparedStatement.executeBatch(OraclePreparedStatement.java:9939)
> at 
> oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:261)
> at 
> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
> at 
> org.apache.commons.dbcp2.DelegatingStatement.executeBatch(DelegatingStatement.java:242)
> at 
> org.apache.nifi.processors.standard.PutSQL.lambda$null$10(PutSQL.java:390)
> at 
> org.apache.nifi.processor.util.pattern.ExceptionHandler.execute(ExceptionHandler.java:127)
> at 
> org.apache.nifi.processors.standard.PutSQL.lambda$new$12(PutSQL.java:388)
> at 
> org.apache.nifi.processor.util.pattern.PutGroup.putFlowFiles(PutGroup.java:94)
> at org.apache.nifi.processor.util.pattern.Put.onTrigger(Put.java:102)
> at 
> org.apache.nifi.processors.standard.PutSQL.lambda$onTrigger$20(PutSQL.java:600)
> at 
> org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:114)
> at 
> org.apache.nifi.processor.util.pattern.RollbackOnFailure.onTrigger(RollbackOnFailure.java:184)
> at 
> org.apache.nifi.processors.standard.PutSQL.onTrigger(PutSQL.java:600)
> at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
> at 
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
> at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
> at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> Do you know what is the cause of the error? How does the PutSQL processor 
> work behind the scenes?
>
> Some data of the flow to take into account in case it contributes:
>
> 1 million data approx every 24 hours
> 20 mb of data approx (attached image)
> 1 flowfile > 1 row
>
> I attach the PutSQL configuration and and the Status History of "Bytes In" 
> for the last 5 minutes.
>
> Is there any configuration or way in PutSQL to capture the error message 
> returned by the database? For example, through an attribute of the flow file 
> that failed.
>
> Thanks!
> Sergio
>


Re: json into a json-enabled DB

2022-12-15 Thread Matt Burgess
Geoffrey,

The biggest problem with JSON columns across the board is that the
JDBC and java.sql.Types specs don't handle them natively, and NiFi
records don't recognize JSON as a particular type, we are only
interested in the overall datatype such as String since NiFi records
can be in any supported format. In my experience these are handled by
setting the JSON column to type java.sql.OTHER (like PostgreSQL) and
they are willing to accept the value as a String (see NIFI-5901 [1]),
and we put in code to handle it as such (see NIFI-5845 [2]). For NiFi
it's been more of an ad-hoc type of support where maybe if the SQL
type is custom and unique we can handle such things (like sql_variant
in MSSQL via NIFI-5819 [3]), but due to the nature of the custom type
it's difficult to handle in any sort of consistent way. Happy to hear
your thoughts and input, perhaps we can add some ad-hoc support for
your use case?

Regards,
Matt

[1] https://issues.apache.org/jira/browse/NIFI-5901
[2] https://issues.apache.org/jira/browse/NIFI-5845
[3] https://issues.apache.org/jira/browse/NIFI-5819

On Wed, Dec 14, 2022 at 3:55 PM Greene (US), Geoffrey N
 wrote:
>
> Some databases (postgres, sql server,  others) support native json columns.
>
> With postgres, there’s a native jsonb type, with sql server it’s a string 
> type, that you can treat as json.
>
>
>
> In any event, once you have the json in the database, one can then query it, 
> e.g.:
>
>
>
> SELECT id,product_name,
>
>JSON_VALUE(attributes, '$.material') AS material
>
> FROM jsontest;
>
>
>
> So, here’s my question:
>
>
>
> If you have a flow file that contains json, whats the best way to insert that 
> into a database?
>
> The only thing I’ve thought of so far is if you have the json string
>
> {“material” : “plastic”}
>
> You then use a TEXT processor to turn that into
>
> {“attributes”: {‘{“material” : “plastic”}’}
>
> And then use a PutDatabaseRecord to actually write the entry.
>
>
>
> Is there a better, or more efficient way to do it?
>
>
>
>
>
>


Re: Expected mergerecord performance

2022-12-20 Thread Matt Burgess
Thanks Vijay! I agree those processors should do the trick but there
were things in the transformation between input and desired output
that I wasn't sure of their origin. If you are setting constants you
can use either a Shift or Default spec, if you are moving fields
around you can use a Shift spec, and in general whether you end up
with one spec or multiple, I find it's easiest to use a Chain spec (an
array of specs) in the processor configuration. You can play around
with the spec(s) at the Jolt playground [1]

An important difference to note between JoltTransformJSON and
JoltTransformRecord is that for the former, the spec is applied to the
entire input (and it is entirely read into memory) where in
JoltTransformRecord the spec is applied to each record.

Regards,
Matt

[1] http://jolt-demo.appspot.com/#inception

On Tue, Dec 20, 2022 at 10:52 AM Vijay Chhipa  wrote:
>
> Hi Richard
> Have you tried JoltTransformJSON or JoltTransformRecord
>
> I believe you should be able to do this
>
> Quick start here:  
> https://community.cloudera.com/t5/Community-Articles/Jolt-quick-reference-for-Nifi-Jolt-Processors/ta-p/244350
>
>
> On Dec 20, 2022, at 4:13 AM, Richard Beare  wrote:
>
> Hi Everyone,
> Still struggling to fix this issue and may need to try some different things.
>
> What is the recommended way of transforming a record structure? At the moment 
> I have a groovy script doing this but the downstream processing is very slow, 
> as discussed in the preceding thread.
>
> The transformation is very simple - starting structure is:
>
> {
>  "result" : {
> "text" : " document text",
>   "metadata" : {
>  "X-TIKA:Parsed-By": [
>  "org.apache.tika.parser.pdf.PDFParser"
>  ],
> "X-OCR-Applied" : true,
> "dcterms:created": "2018;07-24T15:04:51Z",
> "Content-Type" : "application/pdf",
> "Page-Count" : 2,
>   },
> "success" : true,
> "timestamp" :  "2022-12-20T09:02:27.902Z",
> "processingElapsedTime" : 6
> }
> }
>
>
> final structure is':
>
> [ {
> "doc_id" : 58,
> "doc_text" : "   ",
> "processing_timestamp" : "2022-12-20T09:02:27.902Z",
> "metadata_x_ocr_applies" : true,
> "metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser",
> "metadata_content_type" : "application/pdf",
> "metadata_page_count" : 1
> "metadata_creation_date": null,
> "metadata_last_modified: nill
> }]
>
> So a kind of flattening of the structure. Is there a processor I should be 
> using to do this instead of a groovy script?
>
> Thanks
>
> On Wed, Dec 14, 2022 at 7:57 AM Richard Beare  wrote:
>>
>> Any thoughts on this? Are there some extra steps required when creating an 
>> avro file from a user defined schema?
>>
>> On Thu, Dec 8, 2022 at 2:56 PM Richard Beare  wrote:
>>>
>>> Here's another result that I think suggests there's something wrong with 
>>> the avro files created by the groovy script, although I can't see what the 
>>> problem might be.
>>>
>>> The test is as follows. Output of the groovy script creating avro files is 
>>> passed to convertrecord, configured with an avro reader and json writer. 
>>> This is slow. The json output is then converted back to avro with another 
>>> convertrecord processor, configured with a jsontreereader and an avro 
>>> writer - this is fast, instantly emptying the queue. The result of that is 
>>> fed into the previously problematic merge processor which works exactly as 
>>> expected, producing flowfiles with 100 records each.
>>>
>>> The difference I can see between the two flow files is the way in which the 
>>> schema is specified. Perhaps some extras are required in the groovy file to 
>>> set that up?
>>>
>>> The slow one has:
>>>
>>> {"type":"record","name":"document", "fields":[{
>>>
>>> The fast one
>>> {"type":"record","name":"nifiRecord","namespace":"org.apache.nifi","fields":
>>>
>>>
>>> Initial characters are also slightly different.
>>> Slow one:
>>>
>>> 000   O   b   j 001 002 026   a   v   r   o   .   s   c   h   e   m
>>> 020   a 346  \n   {   "   t   y   p   e   "   :   "   r   e   c   o
>>>
>>> Fast one
>>>
>>> 000   O   b   j 001 004 026   a   v   r   o   .   s   c   h   e   m
>>> 020   a 362  \b   {   "   t   y   p   e   "   :   "   r   e   c   o
>>>
>>>
>>> The groovy script is
>>> CogStack-NiFi/parse-tika-result-json-to-avro.groovy at master · 
>>> CogStack/CogStack-NiFi · GitHub
>>>
>>> The schema is
>>> CogStack-NiFi/document.avsc at master · CogStack/CogStack-NiFi · GitHub
>>>
>>>
>>> On Thu, Dec 8, 2022 at 1:59 PM Richard Beare  
>>> wrote:

 I'm diving into the convertrecord tests a bit deeper on the production 
 server.

 The first test case - 259 documents, total of 1M when in avro format in 
 the input queue to the convert record processor. These avro files were not 
 created by the groovy script - they start life as a database query and the 
 text field is in one of the columns. The convertrecord processor runs very 
 quickly - click start, press refresh and it is done. 

Re: Joining multiple tables with left join relation

2022-12-08 Thread Matt Burgess
Michał,

There are some options in NiFi that should work for you, we have
LookupRecord [1] which does simple enrichment based on a common field
(such as id) that acts like a SQL left join, there are a series of
articles [2] to explain in more detail. For more complex use cases
there are ForkEnrichment [3] and JoinEnrichment [4] processors when a
simple left join is not adequate for the use case. Please let us know
if you are having trouble with any of these or if they don't quite fit
your use case.


Regards,
Matt

[1] 
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.19.1/org.apache.nifi.processors.standard.LookupRecord/index.html
[2] 
https://community.cloudera.com/t5/Community-Articles/Data-flow-enrichment-with-NiFi-part-1-LookupRecord-processor/ta-p/246940
[3] 
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.19.1/org.apache.nifi.processors.standard.ForkEnrichment/index.html
[4] 
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.19.1/org.apache.nifi.processors.standard.JoinEnrichment/index.html

On Thu, Dec 8, 2022 at 8:56 AM KURTYKA, M. (MICHAŁ) via users
 wrote:
>
> Hi,
>
>
>
> I’m trying to join one “master” table with multiple enrichment tables by 
> common id with relation that would resemble SQL left join. Is something like 
> that possible in NiFi? Which processors will be most efficient for that? Only 
> column they share is id.
>
>
>
> Kind regards
>
> Michał Kurtyka
>
> ING Hubs B.V. z siedzibą w Amsterdamie, Holandia, VAT PL 526-319-58-54, 
> działająca w Polsce w formie oddziału, pod firmą ING Hubs B.V. spółka z 
> ograniczoną odpowiedzialnością Oddział w Polsce z siedzibą w Katowicach, ul. 
> Konduktorska 35, 40-155 Katowice, NIP: 2050005130, wpisana do rejestru 
> przedsiębiorców Krajowego Rejestru Sądowego prowadzonego przez Sąd Rejonowy 
> Katowice-Wschód w Katowicach, VIII Wydział Gospodarczy Krajowego Rejestru 
> Sądowego pod numerem KRS 702305.


Re: PrometheusReportingTask authorisation?

2022-11-23 Thread Matt Burgess
Michael,

For the authorization use case, I recommend against using the
reporting task and instead using the built in endpoint for metrics
(see https://issues.apache.org/jira/browse/NIFI-7273 for more
details). The NiFi REST API (to include that endpoint) is subject to
the authentication and authorization policies configured for the NiFi
instance.

Regards,
Matt

On Tue, Nov 22, 2022 at 9:51 AM Garland, Michael R
 wrote:
>
> Hi,
>
>
>
> I’ve noticed that whilst the PrometheusReportingTask can be configured for 
> client authentication, there is no means to authorise connections to the 
> /metrics endpoint.  Given the /metrics endpoint can expose processor group 
> and processor names, which could be sensitive in nature, should exposing 
> endpoints like this also be authorised, for example by using the global 
> access policies functionality?
>
>
>
> This strikes me as being something that is maybe niche, but nevertheless 
> without authorisation, could undermine the security aspects of NiFi by 
> providing access to information to say an insider threat (valid client 
> certificate, but not authorised).
>
>
>
> Would be interested to know other’s thoughts on this?
>
>
>
> Michael


Re: ECMAScript support missing from ExecuteScript

2023-01-16 Thread Matt Burgess
Did you upgrade the version of Java when you upgraded NiFi? Later versions of 
Java don’t include the Nashorn (ECMAScript) library, but I believe we added it 
explicitly, perhaps for the 1.20 release (not at computer right now)

Sent from my iPhone

> On Jan 16, 2023, at 6:28 PM, Vijay Chhipa  wrote:
> 
> Hi All, 
> 
> Has ECMAScript been removed from one of the scripting languages supported in 
> the ExecuteScript processor ?
> 
> I had a dataflow use ECMAScript in ExecuteScript processor in NiFi 1.16.1 and 
> after the upgrade to 1.19.1 the processor became invalid. 
> 
> Please see the screenshot below 
> 
> 
> 
> 
> The allowable values in the Script Engine properties are now these
> 
> 
> 
> 
> 
> I didn’t see any mention of ECMAScript removal in any release notes. 
> 
> Wondering if this was an oversight or should I choose a different scripting 
> language for my dataflow. 
> 
> 
> Thank you. 
> Vijay
> 
> 


Re: Execute DB2 stored procedue

2023-02-27 Thread Matt Burgess
Stored procedures that take no output parameters and return ResultSets should 
work fine with ExecuteSQL, but for DBs that allow OUT and INOUT parameters, 
those won’t make it into the outgoing FlowFile (in either content or 
attributes).

Regards,
Matt


> On Feb 27, 2023, at 4:19 PM, Dmitry Stepanov  wrote:
> 
> 
> We run our procedure using ExecuteSQL.
> Just make sure to use proper SQL syntax 
> 
>> On February 27, 2023 2:09:19 p.m. Phillip Lord  
>> wrote:
>> 
>> Hello,
>> 
>> Does anyone have any experience executing a DB2 stored procedure?  
>> Potentially using PutSQL? I don't think it can be done using ExecuteSQL, and 
>> I can likely use an executeStreamCommand to accomplish this.  But trying not 
>> to reinvent the wheel if I can just do it using a simple Nifi processor
>> 
>> Thanks
>> Phil
> 
> 


Re: Execute DB2 stored procedue

2023-02-28 Thread Matt Burgess
Philip,

Those are OUT parameters so ExecuteSQL/PutSQL doesn't know how to get
the values out after calling the procedure. We'd likely want a
separate processor like ExecuteStoredProcedure and would have to
figure out how to handle OUT parameters, maybe adding those fields to
the outgoing records or something.

Regards,
Matt

On Tue, Feb 28, 2023 at 11:16 AM Phillip Lord  wrote:
>
> Thanks for replies...
>
> I'm trying putSQL to call the following stored-procedure...
>
> CREATE OR REPLACE PROCEDURE SMV.RUN_ALL_PS
>  (IN   IN_RESET  CHAR(1),   ->  This will always be 
> 'N" when called from nifi
>OUT  OUT_SQLSTATE CHAR(5).
>OUT  OUT_RETURN_CODEINTEGER,
>OUT  OUT_ERROR_TEXTVARCHAR(1000),
>OUT  OUT_SQL_STMT   VARCHAR(3)
>   )
>
>
> so I'm trying this in putSQL
>
> CALL MYPROCEDURE.PROC1('N', ?,?,?,?)
>
> and I need to supply sql arg attributes... like...
>
> sql.args.1.type = 1
> sql.args.1.value = not sure what to put here
> sql.args.2.type = 4
> sql.args.2.value = not sure what to put here
> etc...
>
> Am I on the right track?
>
> Thanks
>
>
>
>
>
>
> On Mon, Feb 27, 2023 at 8:50 PM Matt Burgess  wrote:
>>
>> Stored procedures that take no output parameters and return ResultSets 
>> should work fine with ExecuteSQL, but for DBs that allow OUT and INOUT 
>> parameters, those won’t make it into the outgoing FlowFile (in either 
>> content or attributes).
>>
>> Regards,
>> Matt
>>
>>
>> On Feb 27, 2023, at 4:19 PM, Dmitry Stepanov  wrote:
>>
>> 
>> We run our procedure using ExecuteSQL.
>> Just make sure to use proper SQL syntax
>>
>> On February 27, 2023 2:09:19 p.m. Phillip Lord  
>> wrote:
>>>
>>> Hello,
>>>
>>> Does anyone have any experience executing a DB2 stored procedure?  
>>> Potentially using PutSQL? I don't think it can be done using ExecuteSQL, 
>>> and I can likely use an executeStreamCommand to accomplish this.  But 
>>> trying not to reinvent the wheel if I can just do it using a simple Nifi 
>>> processor
>>>
>>> Thanks
>>> Phil
>>
>>


Re: Best practice for configuring jolt processor?

2023-02-23 Thread Matt Burgess
I'm resurrecting the PR to add a property to specify a file location
for the Jolt spec. In the meantime you could maintain the spec as the
value of a context variable and point to it in the Jolt Spec property.
Then you can share amongst the nodes and maintain the spec in one
place.

Regards,
Matt

On Wed, Feb 22, 2023 at 3:17 PM Richard Beare  wrote:
>
> Hi,
> I'm hoping to be able to look at this further with someone I'm working with 
> who has more java background. I notice there's a reference to another PR from 
> Matt, so perhaps the bulk of the work has been done? What is the status and 
> what is the next step required?
>
> On Thu, Feb 2, 2023 at 11:28 AM Dirk Arends  wrote:
>>
>> Hi Richard,
>>
>> Some work was previously done to the similar processor JoltTransformJSON but 
>> from what I can see, hasn't been merged into main or included in a release.
>>
>> NIFI-4957: Enable JoltTransformJSON to pickup a Jolt Spec file from a file 
>> location
>>
>> https://issues.apache.org/jira/browse/NIFI-4957
>>
>> I understand this isn't really helpful, but if you have the capacity, it 
>> would be worth having another crack at these changes.
>>
>> Regards,
>> Dirk Arends
>>
>>
>> On Mon, 30 Jan 2023 at 21:50, Richard Beare  wrote:
>>>
>>> Hi,
>>> What is the preferred way of managing the jolttransformrecord processor 
>>> specification? Is there a convenient way to store the spec in a file that 
>>> can be version controlled etc?
>>
>>
>>
>> --
>> Regards,
>>
>> --
>> Dirk Arends


Re: Generalizing QueryRecord to changing inferred CSV headers

2023-04-18 Thread Matt Burgess
I should mention for aggregate values like COUNT(), check out the
CalculateRecordStats processor, not sure if it takes a `/` value (or
whatever means 'select all fields') for a RecordPath or not, if not we
should probably support if prudent. It might also be a nice
improvement to add MAX/MIN support as well, feel free to file a Jira
for that improvement if it will help.

- Matt

On Tue, Apr 18, 2023 at 6:16 PM Matt Burgess  wrote:
>
> Jim,
>
> QueryRecord uses Apache Calcite under the hood and is thus at the
> mercy of the SQL standard (and any additional rules/dialect from
> Apache Calcite) so in general you can't select "all except X" or "all
> except change X to Y". Does it need to be SQL executed against the
> individual fields? If not, take a look at ScriptedTransformRecord doc
> (and its Additional Details page). IIRC you're a Groovy guy now ;) so
> you should be able to alter the fields as you see fit using Groovy
> rather than SQL (alternatively Jython as you've done a bunch of that
> as well).
>
> Regards,
> Matt
>
> On Tue, Apr 18, 2023 at 6:04 PM James McMahon  wrote:
> >
> > Hello. I recently asked the community a question about processing CSV 
> > files. I received some helpful advice about using processors such as 
> > ConvertRecord and QueryRecord, and was encouraged to employ Readers and 
> > RecordSetWriters. I've done that, and thank all who replied.
> >
> > My incoming CSV files come in with different headers because they are 
> > widely different data sets. The header structure is not known in advance. 
> > As such, I configure a QueryRecord processor with a CSVReader that employs 
> > a Schema Access Strategy that is Use String Fields From Header. I configure 
> > a CSVRecordSetWriter that sets Infer Record Schema as its Schema Access 
> > Strategy.
> >
> > Now I want to use that QueryRecord processor to characterize the various 
> > fields using SQL. Record counts, min and max values - things of that 
> > nature. But in all the examples I find in YouTube and in the open source, 
> > the authors presume a knowledge of the fields in advance. For example 
> > Property year is set by Value select "year" from FLOWFILE.
> >
> > We simply don't have that luxury, that awareness in advance. After all, 
> > that's the very reason we inferred the schema in the reader and writer 
> > configuration. The fields are more often than not going to be very 
> > different. Hard wiring them into QueryRecord is not a flow solution that is 
> > flexible enough. We need to grab them from the inferred schema the Reader 
> > and Writer services identified.
> >
> > What syntax or notation can we use in the QueryRecord sql to say "for each 
> > field found in the header, execute this sql against that field"? I guess 
> > what I'm looking for is iteration through all the inferred schema fields, 
> > and dynamic assignment of the field name in the SQL.
> >
> > Has anyone faced this same challenge? How did you solve it?
> > Is there another way to approach this problem?
> >
> > Thank you in advance,
> > Jim


Re: Generalizing QueryRecord to changing inferred CSV headers

2023-04-18 Thread Matt Burgess
Jim,

QueryRecord uses Apache Calcite under the hood and is thus at the
mercy of the SQL standard (and any additional rules/dialect from
Apache Calcite) so in general you can't select "all except X" or "all
except change X to Y". Does it need to be SQL executed against the
individual fields? If not, take a look at ScriptedTransformRecord doc
(and its Additional Details page). IIRC you're a Groovy guy now ;) so
you should be able to alter the fields as you see fit using Groovy
rather than SQL (alternatively Jython as you've done a bunch of that
as well).

Regards,
Matt

On Tue, Apr 18, 2023 at 6:04 PM James McMahon  wrote:
>
> Hello. I recently asked the community a question about processing CSV files. 
> I received some helpful advice about using processors such as ConvertRecord 
> and QueryRecord, and was encouraged to employ Readers and RecordSetWriters. 
> I've done that, and thank all who replied.
>
> My incoming CSV files come in with different headers because they are widely 
> different data sets. The header structure is not known in advance. As such, I 
> configure a QueryRecord processor with a CSVReader that employs a Schema 
> Access Strategy that is Use String Fields From Header. I configure a 
> CSVRecordSetWriter that sets Infer Record Schema as its Schema Access 
> Strategy.
>
> Now I want to use that QueryRecord processor to characterize the various 
> fields using SQL. Record counts, min and max values - things of that nature. 
> But in all the examples I find in YouTube and in the open source, the authors 
> presume a knowledge of the fields in advance. For example Property year is 
> set by Value select "year" from FLOWFILE.
>
> We simply don't have that luxury, that awareness in advance. After all, 
> that's the very reason we inferred the schema in the reader and writer 
> configuration. The fields are more often than not going to be very different. 
> Hard wiring them into QueryRecord is not a flow solution that is flexible 
> enough. We need to grab them from the inferred schema the Reader and Writer 
> services identified.
>
> What syntax or notation can we use in the QueryRecord sql to say "for each 
> field found in the header, execute this sql against that field"? I guess what 
> I'm looking for is iteration through all the inferred schema fields, and 
> dynamic assignment of the field name in the SQL.
>
> Has anyone faced this same challenge? How did you solve it?
> Is there another way to approach this problem?
>
> Thank you in advance,
> Jim


Re: How to cherry pick a specific line from a flowfile?

2023-02-09 Thread Matt Burgess
session.write() doesn’t take just an InputStream, it either takes both an InputStream and OutputStream (if using a StreamCallback like you are) or just an OutputStream (using an OutputStreamCallback, usually for source processors that don’t have FlowFile input)Sent from my iPhoneOn Feb 9, 2023, at 9:34 PM, James McMahon  wrote:Mark, your RouteText blog worked perfectly. Thank you very much.Matt, I still want to get the BufferedReader working. I'm close. Here is my code, with the error that results. I do not know what this error means. Any thoughts?import org.apache.commons.io.IOUtilsimport java.nio.charset.StandardCharsetsdef ff=session.get()if(!ff)returntry {    // Here we are reading from the current flowfile content and writing to the new content    //ff = session.write(ff, { inputStream, outputStream ->    ff = session.write(ff, { inputStream ->        def bufferedReader = new BufferedReader(new InputStreamReader(inputStream))        // Header is the first line...        def header = bufferedReader.readLine()        ff = session.putAttribute(ff, 'mdb.table.header', header)                //def bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream))        //def line        int i = 0        // While the incoming line is not empty, write it to the outputStream        //while ((line = bufferedReader.readLine()) != null) {        //    bufferedWriter.write(line)        //    bufferedWriter.newLine()        //    i++        //}        // By default, INFO doesn't show in the logs and WARN will appear in the processor bulletins        //log.warn("Wrote ${i} lines to output")        bufferedReader.close()        //bufferedWriter.close()    } as StreamCallback)    session.transfer(ff, REL_SUCCESS)} catch (Exception e) {     log.error('Error occurred extracting header for table in mdb file', e)     session.transfer(ff, REL_FAILURE)}The ExecuteScript processor throws this error:02:30:38 UTCERROR4d6c3e21-a72e-16b2-6a7f-f5cadb351c0eExecuteScript[id=4d6c3e21-a72e-16b2-6a7f-f5cadb351c0e] Error occurred extracting header for table in mdb file: groovy.lang.MissingMethodException: No signature of method: Script10$_run_closure1.doCall() is applicable for argument types: (org.apache.nifi.controller.repository.io.TaskTerminationInputStream...) values: [org.apache.nifi.controller.repository.io.TaskTerminationInputStream@5a2d22a7, ...]
Possible solutions: doCall(java.lang.Object), findAll(), findAll()On Thu, Feb 9, 2023 at 8:35 PM Mark Payne <marka...@hotmail.com> wrote:




James,


Have a look at the RouteText processor. I wrote a blog post recently on using it: https://medium.com/cloudera-inc/building-an-effective-nifi-flow-routetext-5068a3b4efb3


Thanks
Mark

Sent from my iPhone

On Feb 9, 2023, at 8:06 PM, James McMahon <jsmcmah...@gmail.com> wrote:





My version of nifi does not have Range Sampling unfortunately.
If I get the flowfile through a session as done in the Cookbook, does anyone know of an approach in Groovy to grab line N and avoid loading the entire CSV file into string variable
text?



On Thu, Feb 9, 2023 at 7:18 PM Matt Burgess <mattyb...@gmail.com> wrote:


I’m AFK ATM but Range Sampling was added into the SampleRecord processor (https://issues.apache.org/jira/browse/NIFI-9814), the Jira doesn’t say which version it went
 into but it is definitely in 1.19.1+. If that’s available to you then you can just specify “2” as the range and it will only return that line.



For total record count without loading the whole thing into memory, there’s probably a more efficient way but you could use ConvertRecord and convert it from CSV to CSV and it should write out the “record.count” attribute. I think some/most/all record
 processors write this attribute, and they work record by record so they don’t load the whole thing into memory. Even SampleRecord adds a record.count attribute but if you specify one line the value will be 1 :)


Regards,
Matt




On Feb 9, 2023, at 6:57 PM, James McMahon <jsmcmah...@gmail.com> wrote:






Hello. I am trying to identify a header line and a data line count from a flowfile that is in csv format.


Most of us are familiar with Matt B's outstanding Cookbook series, and I am trying to use that as my starting point. Here is my Groovy code:



import org.apache.commons.io.IOUtils
import java.nio.charset.StandardCharsets
def ff=session.get()
if(!ff)return
try {
     def text = ''
     // Cast a closure with an inputStream parameter to InputStreamCallback
     session.read(ff, {inputStream ->
          text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
          // Do something with text here
          // get header from the second line of the flowfile
          // set datacount as the total line count of the file - 2 
          ...
          ff = session.putAttribute(ff, 'mdb.table.header', header)
          ff = session.putAttribute(ff, 'mdb.table.datarecords', datacount)
     } as InputStreamCallback)

Re: How to cherry pick a specific line from a flowfile?

2023-02-09 Thread Matt Burgess
I’m AFK ATM but Range Sampling was added into the SampleRecord processor 
(https://issues.apache.org/jira/browse/NIFI-9814), the Jira doesn’t say which 
version it went into but it is definitely in 1.19.1+. If that’s available to 
you then you can just specify “2” as the range and it will only return that 
line.

For total record count without loading the whole thing into memory, there’s 
probably a more efficient way but you could use ConvertRecord and convert it 
from CSV to CSV and it should write out the “record.count” attribute. I think 
some/most/all record processors write this attribute, and they work record by 
record so they don’t load the whole thing into memory. Even SampleRecord adds a 
record.count attribute but if you specify one line the value will be 1 :)

Regards,
Matt


> On Feb 9, 2023, at 6:57 PM, James McMahon  wrote:
> 
> 
> Hello. I am trying to identify a header line and a data line count from a 
> flowfile that is in csv format.
> 
> Most of us are familiar with Matt B's outstanding Cookbook series, and I am 
> trying to use that as my starting point. Here is my Groovy code:
> 
> import org.apache.commons.io.IOUtils
> import java.nio.charset.StandardCharsets
> def ff=session.get()
> if(!ff)return
> try {
>  def text = ''
>  // Cast a closure with an inputStream parameter to InputStreamCallback
>  session.read(ff, {inputStream ->
>   text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
>   // Do something with text here
>   // get header from the second line of the flowfile
>   // set datacount as the total line count of the file - 2 
>   ...
>   ff = session.putAttribute(ff, 'mdb.table.header', header)
>   ff = session.putAttribute(ff, 'mdb.table.datarecords', datacount)
>  } as InputStreamCallback)
>  session.transfer(flowFile, REL_SUCCESS)
> } catch(e) {
>  log.error('Error occurred identifying tables in mdb file', e)
>  session.transfer(ff, REL_FAILURE)
> }
> 
> I want to avoid using that line in red, because as Matt cautions in his 
> cookbook, our csv files are too large. I do not want to read in the entire 
> file to variable text. It's going to be a problem.
> 
> How in Groovy can I cherry pick only the line I want from the stream (line #2 
> in this case)?
> 
> Also, how can I get a count of the total lines without loading them all into 
> text?
> 
> Thanks in advance for your help.


Re: PutSQL fragmented transaction error

2023-02-13 Thread Matt Burgess
As of Apache NiFi 1.13.0 [1], there are properties in
PutDatabaseRecord that can be used to specify the operation type as
well as the location of the "rows" in the input, to support the use
case of nested/structured input. You can add a field to each record
specifying the operation type ("insert", "update", "delete") and then
set Statement Type in PutDatabaseRecord to Use Record Path, then set
the Statement Type Record Path property to point at the field
containing the statement type. So for example let's say you have
"flat" records with fields x,y,stmt_type. You can set Statement Type
Record Path to /stmt_type and it will check each record for that field
and use its value as the statement type for handling that
record/"row".

Please let us know if you have any questions or issues.

Regards,
Matt

[1] https://issues.apache.org/jira/browse/NIFI-8146

On Fri, Feb 10, 2023 at 1:17 PM João Marcos Polo Junior
 wrote:
>
> That's great to hear! Im will give it a try. What about performing different 
> operations (such as insert for a set of records, updates for another set of 
> record and the same for deletes) in a transaction? Is it possible with 
> PutDatabaseRecord?
>
> Em sex., 10 de fev. de 2023 às 14:29, Matt Burgess  
> escreveu:
>>
>> I agree with Chris here about using PutDatabaseRecord instead of the
>> Split and PutSQL. PutDatabaseRecord will process all records in a
>> FlowFile as a transaction, so in PutDatabaseRecord you can set an
>> AvroReader (to read the records coming out of ExecuteSQL) and the
>> statement type (such as INSERT) and it will process all those records
>> as one transaction.
>>
>> Regards,
>> Matt
>>
>> On Fri, Feb 10, 2023 at 12:04 PM João Marcos Polo Junior
>>  wrote:
>> >
>> > Thanks for replying it. I'm afraid this approach do not work with 
>> > transaction. How can I process all records in the same transaction?
>> >
>> > Em sex., 10 de fev. de 2023 às 13:05, Chris Sampson 
>> >  escreveu:
>> >>
>> >> I don't use the database/SQL processors much, but see questions about 
>> >> these on the Apache NiFi Slack channels quite regularly - you might have 
>> >> better look using ExecuteSQLRecord (can output in Avro or JSON, etc. 
>> >> using your wanted RecordSetWriter Controller Service) then feed that 
>> >> through to a PutDatabaseRecord (can read Avro, JSON, etc. depending upon 
>> >> your configured RecordReader).
>> >>
>> >> If you want to change the data in between then consider other Record 
>> >> based processors such as UpdateRecord or QueryRecord.
>> >>
>> >> On Fri, 10 Feb 2023, 15:57 João Marcos Polo Junior, 
>> >>  wrote:
>> >>>
>> >>> I’m trying to create a flow (nifi 1.18) to query a database 
>> >>> (ExecuteSQL), transform it records to json (ConvertAvroToJson), split it 
>> >>> into json objects (SplitJson) and then perform the necessary actions 
>> >>> into another database (PutSQL). All json objects splitted from the same 
>> >>> original flowfile needs to be processed in a transaction and for that 
>> >>> i’m using a PutSQL with Fragmented Transactions set it to true.
>> >>>
>> >>> First problem: I cant set the Transaction Timeout to more than “30 sec” 
>> >>> because my flowfiles (waiting in the upstream queue) dont ever get 
>> >>> processed and dont get to go to the failure connection. They stay stuck 
>> >>> in the upstream connection, get penalized, but never processed or 
>> >>> redirected to failure when the timeout (more than 30sec) reaches the end.
>> >>>
>> >>>
>> >>>
>> >>> Second problem: I want to combine the Transaction Timeout attribute with 
>> >>> the Penalty Time, Yield Time or maybe Run Schedule but thats not working 
>> >>> either.
>> >>>
>> >>> Is there a solution for these problem? Is there something I have to 
>> >>> configure in the DBCPConnectionPool for that to work?
>> >>>
>> >>> Here’s a similar problem in version 1.12: 
>> >>> https://issues.apache.org/jira/browse/NIFI-8733
>> >>>
>> >>>
>> >>>
>> >>> Thanks in advance!
>> >>>
>> >>>


Re: PutSQL fragmented transaction error

2023-02-10 Thread Matt Burgess
I agree with Chris here about using PutDatabaseRecord instead of the
Split and PutSQL. PutDatabaseRecord will process all records in a
FlowFile as a transaction, so in PutDatabaseRecord you can set an
AvroReader (to read the records coming out of ExecuteSQL) and the
statement type (such as INSERT) and it will process all those records
as one transaction.

Regards,
Matt

On Fri, Feb 10, 2023 at 12:04 PM João Marcos Polo Junior
 wrote:
>
> Thanks for replying it. I'm afraid this approach do not work with 
> transaction. How can I process all records in the same transaction?
>
> Em sex., 10 de fev. de 2023 às 13:05, Chris Sampson 
>  escreveu:
>>
>> I don't use the database/SQL processors much, but see questions about these 
>> on the Apache NiFi Slack channels quite regularly - you might have better 
>> look using ExecuteSQLRecord (can output in Avro or JSON, etc. using your 
>> wanted RecordSetWriter Controller Service) then feed that through to a 
>> PutDatabaseRecord (can read Avro, JSON, etc. depending upon your configured 
>> RecordReader).
>>
>> If you want to change the data in between then consider other Record based 
>> processors such as UpdateRecord or QueryRecord.
>>
>> On Fri, 10 Feb 2023, 15:57 João Marcos Polo Junior,  
>> wrote:
>>>
>>> I’m trying to create a flow (nifi 1.18) to query a database (ExecuteSQL), 
>>> transform it records to json (ConvertAvroToJson), split it into json 
>>> objects (SplitJson) and then perform the necessary actions into another 
>>> database (PutSQL). All json objects splitted from the same original 
>>> flowfile needs to be processed in a transaction and for that i’m using a 
>>> PutSQL with Fragmented Transactions set it to true.
>>>
>>> First problem: I cant set the Transaction Timeout to more than “30 sec” 
>>> because my flowfiles (waiting in the upstream queue) dont ever get 
>>> processed and dont get to go to the failure connection. They stay stuck in 
>>> the upstream connection, get penalized, but never processed or redirected 
>>> to failure when the timeout (more than 30sec) reaches the end.
>>>
>>>
>>>
>>> Second problem: I want to combine the Transaction Timeout attribute with 
>>> the Penalty Time, Yield Time or maybe Run Schedule but thats not working 
>>> either.
>>>
>>> Is there a solution for these problem? Is there something I have to 
>>> configure in the DBCPConnectionPool for that to work?
>>>
>>> Here’s a similar problem in version 1.12: 
>>> https://issues.apache.org/jira/browse/NIFI-8733
>>>
>>>
>>>
>>> Thanks in advance!
>>>
>>>


Re: Adjusting FlattenJson output

2023-06-01 Thread Matt Burgess
Jim,

I tried to use Jolt for this but I found in the doc that if you try to
set an empty array or map to null or the empty string it will retain
the empty array or map (no idea why). Since you know the name of the
fields (and I assume want to keep the schema intact) you can use
ScriptedTransformRecord to get at those fields by name and set them to
null.

Regards,
Matt

On Mon, May 29, 2023 at 10:38 AM James McMahon  wrote:
>
> I have incoming JSON data that begins like this, and that I am trying to 
> flatten with FlattenJSON v1.16.3:
>
> {
>   "meta" : {
> "view" : {
>   "id" : "kku6-nxdu",
>   "name" : "Demographic Statistics By Zip Code",
>   "assetType" : "dataset",
>   "attribution" : "Department of Youth and Community Development (DYCD)",
>   "averageRating" : 0,
>   "category" : "City Government",
>   "createdAt" : 1311775554,
>   "description" : "Demographic statistics broken down by zip code",
>   "displayType" : "table",
>   "downloadCount" : 1017553,
>   "hideFromCatalog" : false,
>   "hideFromDataJson" : false,
>   "indexUpdatedAt" : 1536596131,
>   "newBackend" : true,
>   "numberOfComments" : 3,
>   "oid" : 4208790,
>   "provenance" : "official",
>   "publicationAppendEnabled" : false,
>   "publicationDate" : 1372266760,
>   "publicationGroup" : 238846,
>   "publicationStage" : "published",
>   "rowClass" : "",
>   "rowsUpdatedAt" : 1372266747,
>   "rowsUpdatedBy" : "uurm-7z6x",
>   "tableId" : 942474,
>   "totalTimesRated" : 0,
>   "viewCount" : 70554,
>   "viewLastModified" : 1652135219,
>   "viewType" : "tabular",
>   "approvals" : [ {
> "reviewedAt" : 1372266760,
> "reviewedAutomatically" : true,
> "state" : "approved",
> "submissionId" : 1064760,
> "submissionObject" : "public_audience_request",
> "submissionOutcome" : "change_audience",
> "submittedAt" : 1372266760,
> "workflowId" : 2285,
> "submissionDetails" : {
>   "permissionType" : "READ"
> },
> "submissionOutcomeApplication" : {
>   "failureCount" : 0,
>   "status" : "success"
> },
> "submitter" : {
>   "id" : "5fuc-pqz2",
>   "displayName" : "NYC OpenData"
> }
>   } ],
>   "clientContext" : {
> "clientContextVariables" : [ ],
> "inheritedVariables" : { }
>   },
>   "columns" : [ {
> "id" : -1,
> "name" : "sid",
> "dataTypeName" : "meta_data",
> "fieldName" : ":sid",
> "position" : 0,
> "renderTypeName" : "meta_data",
> "format" : { },
> "flags" : [ "hidden" ]
>   }, { .
>
> This is my configuration of my FlattenJson processor:
> Separator  .
> Flatten modenormal
> Ignore Reserved Charactersfalse
> Return Type  flatten
> Character Set   UTF-8
> Pretty Print JSON true
>
> Those lines in red appear in my output like this:
>   "meta.view.clientContext.clientContextVariables" : [  ],
>   "meta.view.clientContext.inheritedVariables" : {
>
>   },
>
> I don't want to preserve the empty list and empty map. I want to set the 
> values for these keys to the empty string or null is acceptable.
>
> Can I do this natively through the FlattenJson configuration? If not, what 
> would be the most efficient means to post-process to what I seek in my flow?
>
> Thanks in advance for any help.


Re: Transformation: Custom column mapping with SQL Tables (Source > Target Tables)

2023-06-21 Thread Matt Burgess
Kyrindor,

Can you provide an example of the kind of mapping you want to do? It
sounds like UpdateRecord [1] should work to change input fields to
output fields. For joining we offer "enrichment" and "lookup"
components that can add fields based on some other field value(s).

Regards,
Matt

[1] 
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.22.0/org.apache.nifi.processors.standard.UpdateRecord/index.html

On Mon, Jun 19, 2023 at 5:44 AM Kyrindorx  wrote:
>
> Hello all,
>
> I would like to use Apache Nifi to build a data pipeline with dynamic
> processors. The user must be able to use e.g. processor steps with table
> column mapping to transform data from a source table into intermediate
> tables and the final table.
>
> The user requires a free configuration of the column mapping from source
> table to target table.
>
> Can processors be offered customizable like this? Which SQL table
> processors with mapping, join does Nifi offer? And how flexible can
> these be set by the user?
>
> The question is a bit difficult for me, because I don't know the
> possibilities of Nifi yet and have to approach the topic.
>
> Thx in advance
> Kyrindor
>
>
>
>


Re: Transformation: Custom column mapping with SQL Tables (Source > Target Tables)

2023-06-23 Thread Matt Burgess
There's no UI in NiFi for that but doing a mapping in UpdateRecord is
similar, add user-defined properties to map one field to another.

On Wed, Jun 21, 2023 at 7:05 PM Rafael Fracasso
 wrote:
>
> I think that he wants something like a SSIS approach mapping from the UI the 
> source -> destination
>
> On Wed, Jun 21, 2023 at 11:36 AM Matt Burgess  wrote:
>>
>> Kyrindor,
>>
>> Can you provide an example of the kind of mapping you want to do? It
>> sounds like UpdateRecord [1] should work to change input fields to
>> output fields. For joining we offer "enrichment" and "lookup"
>> components that can add fields based on some other field value(s).
>>
>> Regards,
>> Matt
>>
>> [1] 
>> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.22.0/org.apache.nifi.processors.standard.UpdateRecord/index.html
>>
>> On Mon, Jun 19, 2023 at 5:44 AM Kyrindorx  wrote:
>> >
>> > Hello all,
>> >
>> > I would like to use Apache Nifi to build a data pipeline with dynamic
>> > processors. The user must be able to use e.g. processor steps with table
>> > column mapping to transform data from a source table into intermediate
>> > tables and the final table.
>> >
>> > The user requires a free configuration of the column mapping from source
>> > table to target table.
>> >
>> > Can processors be offered customizable like this? Which SQL table
>> > processors with mapping, join does Nifi offer? And how flexible can
>> > these be set by the user?
>> >
>> > The question is a bit difficult for me, because I don't know the
>> > possibilities of Nifi yet and have to approach the topic.
>> >
>> > Thx in advance
>> > Kyrindor
>> >
>> >
>> >
>> >


Re: How to determine groovy version

2023-05-13 Thread Matt Burgess
Jim,

You can find out in Github [1] or from your installation you can do
(substituting your NiFi version in the NAR name):

jar -tvf lib/nifi-scripting-nar-1.16.3.nar | grep groovy

Regards,
Matt

[1] 
https://github.com/apache/nifi/blob/rel/nifi-1.16.3/nifi-nar-bundles/nifi-scripting-bundle/pom.xml#L34

On Sat, May 13, 2023 at 11:05 AM James McMahon  wrote:
>
> I'm using a series of Groovy scripts running from NiFi v1.16.3 ExecuteScript. 
> How can I determine which version of Groovy is baked into NiFi v1.16.3?


Re: Access instance name

2024-02-13 Thread Matt Burgess
Etienne,

What instance name / id are you referring to?

On Tue, Feb 13, 2024 at 8:43 AM Etienne Jouvin 
wrote:

> Hello all.
>
> Just simple question, is there a way to access, from controller service,
> to the instance name / id event in not cluster implementation?
>
> Regards
>
> Etienne Jouvin
>


Re: Exception reason

2023-12-26 Thread Matt Burgess
It looks like we need to call release() from some place(s) where we
don't currently. HBase had the same issues [1]. We use this in the
StandardMapCacheServer and ListenBeats, are you using either, neither,
or both?

Regards,
Matt

[1] https://github.com/netty/netty/issues/12549

On Tue, Dec 26, 2023 at 8:53 AM Rafael Fracasso
 wrote:
>
> Hi,
>
> sometimes I have these ResourceLeakDetector exceptions on my nifi-app.log, 
> you guys know the reason this exception was raised and how I avoid them to 
> happen?
>
> Thanks
>
> Rafael
>
>
>
> 2023-12-26 09:02:15,353 ERROR [nioEventLoopGroup-3-6] 
> io.netty.util.ResourceLeakDetector LEAK: ByteBuf.release() was not called 
> before it's garbage-collected. See 
> https://netty.io/wiki/reference-counted-objects.html for more information.
>
> Recent access records:
>
> Created at:
>
> io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:403)
>
> io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
>
> io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:124)
>
> io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:871)
>
> org.apache.nifi.distributed.cache.server.codec.CacheRequestDecoder.readHeader(CacheRequestDecoder.java:238)
>
> org.apache.nifi.distributed.cache.server.codec.CacheRequestDecoder.decode(CacheRequestDecoder.java:80)
>
> io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:519)
>
> io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:458)
>
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:280)
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
>
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
>
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
>
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
>
> io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
>
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
>
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>
> java.base/java.lang.Thread.run(Thread.java:833)


Re: Extract from jars and nars

2023-12-31 Thread Matt Burgess
Jim,

When you say you want to "avoid having to output them to a temp
directory", does that include the content repo? If not you can use
UnpackContent with a Packaging Type of zip. I tried on both JARs and
NARs and it works.

Regards,
Matt

On Sun, Dec 31, 2023 at 12:37 PM James McMahon  wrote:
>
> I have a NiFi flow that handles many jar and nar archive files as incoming 
> flowfiles. I am trying to figure out a way I can extract files from these 
> archives - for example, in most cases one incoming jar has a number of files 
> in its archive. So one flowfile should yield N output flowfiles if there are 
> N files in the archive.
>
> I do not have /usr/bin/jar on my system. I have read, though, that unzip can 
> be employed to extract from jars, and I have that. So I am trying to use that.
>
> How can I configure an ExecuteStreamCommand processor to take an incoming 
> flowfile as stdin, and output each member of the archive as one of N output 
> flowfiles to stdout? Ideally I want to avoid having to output my streaming 
> flowfile to a temporary  physical directory; I want to perform the extraction 
> entirely in stream.
>
> I have used ExecuteStreamCommand before but can't recall how to get it to 
> work for this use case.
>
> Thanks for any help.


Re: Hardware requirement for NIFI instance

2024-01-04 Thread Matt Burgess
If I remember correctly, the default Fetch Size for Postgresql is to
get all the rows at once, which can certainly cause the problem.
Perhaps try setting Fetch Size to something like 1000 or so and see if
that alleviates the problem.

Regards,
Matt

On Thu, Jan 4, 2024 at 8:48 AM Etienne Jouvin  wrote:
>
> Hello.
>
> I also think the problem is more about the processor, I guess ExecuteSQL.
>
> Should play with batch configuration and commit flag to commit intermediate 
> FlowFile.
>
> The out of memory exception makes me believe the full table is retrieved, and 
> if it is huge the FlowFile content is very large.
>
>
>
>
> Le jeu. 4 janv. 2024 à 14:37, Pierre Villard  a 
> écrit :
>>
>> It should be memory efficient so I think this is likely a configuration 
>> aspect of your processor. Can you share the configuration for all properties?
>> As a side note: if NiFi ran out of memory, you'd always want to restart it 
>> because you are never sure what's the state of the JVM after an OOME.
>>
>> Le jeu. 4 janv. 2024 à 17:26,  a écrit :
>>>
>>>
>>> Hello all,
>>>
>>> Who could help me to determine the cpu/memory need for nifi instance to 
>>> fetch the data from Postgresql hosted in google ?
>>>
>>> We got this error :
>>> ==> Error : executesql.error.message
>>> Ran out of memory retrieving query results.
>>>
>>> The procesor ExecuteSQL has this config : Set Auto Commit ==> false
>>> driver Jar to use : postgresql-42.7.1.jar
>>> Java version : jdk-11.0.19
>>>
>>> Table information :
>>> rows number : 14958836
>>> fields number : 20
>>>
>>> Linux Rocky8
>>>
>>> Architecture:x86_64
>>> CPU op-mode(s):  32-bit, 64-bit
>>> Byte Order:  Little Endian
>>> CPU(s):  2
>>> On-line CPU(s) list: 0,1
>>> Thread(s) per core:  2
>>> Core(s) per socket:  1
>>> Socket(s):   1
>>> NUMA node(s):1
>>> Vendor ID:   GenuineIntel
>>> BIOS Vendor ID:  Google
>>> CPU family:  6
>>> Model:   85
>>> Model name:  Intel(R) Xeon(R) CPU @ 2.80GHz
>>> Stepping:7
>>> CPU MHz: 2800.286
>>> BogoMIPS:5600.57
>>> Hypervisor vendor:   KVM
>>> Virtualization type: full
>>> L1d cache:   32K
>>> L1i cache:   32K
>>> L2 cache:1024K
>>> L3 cache:33792K
>>> NUMA node0 CPU(s):   0,1
>>>
>>> Memory : 8GB
>>>
>>> Thanks for you helps
>>>
>>> Minh


Re: Hardware requirement for NIFI instance

2024-01-04 Thread Matt Burgess
You may not need to merge if your Fetch Size is set appropriately. For
your case I don't recommend setting Max Rows Per Flow File because you
still have to wait for all the results to be processed before the
FlowFile(s) get sent "downstream". Also if you set Output Batch Size
you can't use Merge downstream as ExecuteSQL will send FlowFiles
downstream before it knows the total count.

If you have a NiFi cluster and not a standalone instance you MIGHT be
able to represent your complex query using GenerateTableFetch and use
a load-balanced connection to grab different "pages" of the table in
parallel with ExecuteSQL. Those can be merged later as long as you get
all the FlowFiles back to a single node. Depending on how complex your
query is then it's a long shot but I thought I'd mention it just in
case.

Regards,
Matt


On Thu, Jan 4, 2024 at 1:41 PM Pierre Villard
 wrote:
>
> You can merge multiple Avro flow files with MergeRecord with an Avro Reader 
> and an Avro Writer
>
> Le jeu. 4 janv. 2024 à 22:05,  a écrit :
>>
>> And the important thing for us it has only one avro file by table.
>>
>> So it is possible to merge avro files to one avro file ?
>>
>> Regards
>>
>>
>> Envoyé: jeudi 4 janvier 2024 à 19:01
>> De: e-soci...@gmx.fr
>> À: users@nifi.apache.org
>> Cc: users@nifi.apache.org
>> Objet: Re: Hardware requirement for NIFI instance
>>
>> Hello all,
>>
>> Thanks a lot for the reply.
>>
>> So for more details.
>>
>> All the properties for the ExecuteSQL are set by default, except "Set Auto 
>> Commit:  false".
>>
>> The sql command could not be more simple than "select * from 
>> ${db.table.fullname}"
>>
>> The nifi version is 1.16.3 and 1.23.2
>>
>> I have also test the same sql command in the another nifi (8 cores/ 16G Ram) 
>> and it is working.
>> The result is the avro file with 1.6GB
>>
>> The detail about the output flowfile :
>>
>> executesql.query.duration
>> 245118
>> executesql.query.executiontime
>> 64122
>> executesql.query.fetchtime
>> 180996
>> executesql.resultset.index
>> 0
>> executesql.row.count
>> 14961077
>>
>> File Size
>> 1.62 GB
>>
>> Regards
>>
>> Minh
>>
>>
>> Envoyé: jeudi 4 janvier 2024 à 17:18
>> De: "Matt Burgess" 
>> À: users@nifi.apache.org
>> Objet: Re: Hardware requirement for NIFI instance
>> If I remember correctly, the default Fetch Size for Postgresql is to
>> get all the rows at once, which can certainly cause the problem.
>> Perhaps try setting Fetch Size to something like 1000 or so and see if
>> that alleviates the problem.
>>
>> Regards,
>> Matt
>>
>> On Thu, Jan 4, 2024 at 8:48 AM Etienne Jouvin  
>> wrote:
>> >
>> > Hello.
>> >
>> > I also think the problem is more about the processor, I guess ExecuteSQL.
>> >
>> > Should play with batch configuration and commit flag to commit 
>> > intermediate FlowFile.
>> >
>> > The out of memory exception makes me believe the full table is retrieved, 
>> > and if it is huge the FlowFile content is very large.
>> >
>> >
>> >
>> >
>> > Le jeu. 4 janv. 2024 à 14:37, Pierre Villard  
>> > a écrit :
>> >>
>> >> It should be memory efficient so I think this is likely a configuration 
>> >> aspect of your processor. Can you share the configuration for all 
>> >> properties?
>> >> As a side note: if NiFi ran out of memory, you'd always want to restart 
>> >> it because you are never sure what's the state of the JVM after an OOME.
>> >>
>> >> Le jeu. 4 janv. 2024 à 17:26,  a écrit :
>> >>>
>> >>>
>> >>> Hello all,
>> >>>
>> >>> Who could help me to determine the cpu/memory need for nifi instance to 
>> >>> fetch the data from Postgresql hosted in google ?
>> >>>
>> >>> We got this error :
>> >>> ==> Error : executesql.error.message
>> >>> Ran out of memory retrieving query results.
>> >>>
>> >>> The procesor ExecuteSQL has this config : Set Auto Commit ==> false
>> >>> driver Jar to use : postgresql-42.7.1.jar
>> >>> Java version : jdk-11.0.19
>> >>>
>> >>> Table information :
>> >>> rows number : 14958836
>> >>> fields number : 20
>> >>>
>> >>> Linux Rocky8
>> >>>
>> >>> Architecture: x86_64
>> >>> CPU op-mode(s): 32-bit, 64-bit
>> >>> Byte Order: Little Endian
>> >>> CPU(s): 2
>> >>> On-line CPU(s) list: 0,1
>> >>> Thread(s) per core: 2
>> >>> Core(s) per socket: 1
>> >>> Socket(s): 1
>> >>> NUMA node(s): 1
>> >>> Vendor ID: GenuineIntel
>> >>> BIOS Vendor ID: Google
>> >>> CPU family: 6
>> >>> Model: 85
>> >>> Model name: Intel(R) Xeon(R) CPU @ 2.80GHz
>> >>> Stepping: 7
>> >>> CPU MHz: 2800.286
>> >>> BogoMIPS: 5600.57
>> >>> Hypervisor vendor: KVM
>> >>> Virtualization type: full
>> >>> L1d cache: 32K
>> >>> L1i cache: 32K
>> >>> L2 cache: 1024K
>> >>> L3 cache: 33792K
>> >>> NUMA node0 CPU(s): 0,1
>> >>>
>> >>> Memory : 8GB
>> >>>
>> >>> Thanks for you helps
>> >>>
>> >>> Minh
>>
>>


Re: Pattern advice - files on disk into a record field

2024-01-04 Thread Matt Burgess
I think the hard part here is taking a "raw" file like PDF bytes and
creating a record in a certain format. For now I think ScriptedReader
is your best bet, you can read the entire input stream in as a byte
array then return a Record that contains a "bytes" field containing
that data. You can create the schema for the record(s) in the script.
Then whatever writer (AvroRecordSetWriter in this case?) will write it
out in that format. I recommend a binary format like Avro instead of a
text-based one like JSON to ensure the bytes don't get mangled.

If we don't have a "bytes record reader" or something similar, we
should look into adding it, I think most of the readers (even
GrokReader IIRC) are text-based so we may not have something built-in
that can take the content of a Flow File and turn it into a one-field
Record of bytes. I have a stale PR for a text-based line reader [1]
but you can use GrokReader for that so I let it go stale. Maybe I
should do a similar one for binary data.

Regards,
Matt

[1] https://github.com/apache/nifi/pull/3735

On Wed, Jan 3, 2024 at 7:20 PM Richard Beare  wrote:
>
> Any insights on this question post break? I think my problem can be 
> summarised as looking for the right way to place binary data, stored as a 
> on-disk file into a field of an avro record
>
> On Wed, Dec 20, 2023 at 5:06 PM Richard Beare  wrote:
>>
>> I think I've made some progress with this, but I'm now having trouble with 
>> pdf files. The approach that seems to partly solve the problem is to have a 
>> ConvertRecord processor with a scripted reader to place the on disk (as 
>> delivered by the GetFile processor) into a record field. I can then use an 
>> UpdateRecord to add other fields. My current problem, I think, is correctly 
>> dealing with dumping a binary object (e.g. a pdf file) into that field. 
>> Going via strings worked for html files but breaks pdfs. I'm struggling with 
>> how to correctly set up the schema from within the script.
>>
>> On Tue, Dec 19, 2023 at 12:31 PM Richard Beare  
>> wrote:
>>>
>>> Hi,
>>> I've gotten rusty, not having done much nifi work for a while.
>>>
>>> I want to run some tests of the following scenario. I have a workflow that 
>>> takes documents from a DB and feeds them through tika. I want to test with 
>>> a different document set that is currently living on disk. The tika 
>>> (groovy) processor that is my front end is expecting a record with a number 
>>> of fields, one of which is the document content.
>>>
>>> I can simulate the fields (badly, but that doesn't matter at this stage), 
>>> with generate record, but how do I get document contents from disk into the 
>>> right place. I've been thinking of using updaterecord to modify the random 
>>> records, but can't see how to get the data from GetFile into the right 
>>> place.
>>>
>>> Another thought is that perhaps I need to convert the GetFile output into 
>>> the right record structure with convertrecord, but then how to fill the 
>>> other fields.
>>>
>>> What am I missing here?


Re: How to upgrade from 1.24.0 to 2.0.0-M1

2023-12-06 Thread Matt Burgess
Indeed it looks like someone else has run into this:

https://stackoverflow.com/questions/77615582/apache-nifi-2-x-org-eclipse-jetty-http-badmessageexception-400-invalid-sni


On Wed, Dec 6, 2023 at 10:05 PM Adam Taft  wrote:

> David,
>
> Any chance that the Jetty SNI related information could also end up in the
> migration guide? I suspect that both of these issues are going to be
> frequently asked as folks evaluate NiFi 2.0 (especially with a 1.x
> migration).
>
> Thanks for holding everyone's hand here. Much appreciated!
>
> /Adam
>
>
> On Fri, Dec 1, 2023 at 5:45 AM David Handermann <
> exceptionfact...@apache.org> wrote:
>
>> Ben,
>>
>> Thanks for the additional details on the HTTP 400 Invalid SNI error.
>>
>> Jetty 10, which is included with NiFi 2.0.0-M1, incorporates updates to
>> the Server Name Indication processing during the TLS handshake. As a result
>> of these changes, the default behavior does not support accessing NiFi
>> using an IP address. Using a hostname or DNS name will avoid the SNI error
>> and allow standard TLS negotiation to work.
>>
>> Regards,
>> David Handermann
>>
>> On Thu, Nov 30, 2023 at 12:07 AM Ben .T.George 
>> wrote:
>>
>>> HI,
>>>
>>> Thanks for the update, i have changed and started the process, now the
>>> port 8443 is listening , i cannot able to access web url. while i am trying
>>> , i am getting below error on browser:
>>>
>>> HTTP ERROR 400 Invalid SNI
>>> URI: /nifi
>>> STATUS: 400
>>> MESSAGE: Invalid SNI
>>> SERVLET: -
>>> CAUSED BY: org.eclipse.jetty.http.BadMessageException: 400: Invalid 
>>> SNICaused
>>> by:
>>>
>>> org.eclipse.jetty.http.BadMessageException: 400: Invalid SNI
>>> at 
>>> org.eclipse.jetty.server.SecureRequestCustomizer.customize(SecureRequestCustomizer.java:266)
>>> at 
>>> org.eclipse.jetty.server.SecureRequestCustomizer.customize(SecureRequestCustomizer.java:207)
>>> at 
>>> org.eclipse.jetty.server.HttpChannel$RequestDispatchable.dispatch(HttpChannel.java:1594)
>>> at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:753)
>>> at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:501)
>>> at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:461)
>>> at 
>>> org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.runTask(AdaptiveExecutionStrategy.java:421)
>>> at 
>>> org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.consumeTask(AdaptiveExecutionStrategy.java:390)
>>> at 
>>> org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.tryProduce(AdaptiveExecutionStrategy.java:277)
>>> at 
>>> org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.produce(AdaptiveExecutionStrategy.java:193)
>>> at 
>>> org.eclipse.jetty.http2.HTTP2Connection.produce(HTTP2Connection.java:208)
>>> at 
>>> org.eclipse.jetty.http2.HTTP2Connection.onFillable(HTTP2Connection.java:155)
>>> at 
>>> org.eclipse.jetty.http2.HTTP2Connection$FillableCallback.succeeded(HTTP2Connection.java:450)
>>> at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:100)
>>> at 
>>> org.eclipse.jetty.io.ssl.SslConnection$DecryptedEndPoint.onFillable(SslConnection.java:558)
>>> at 
>>> org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:379)
>>> at 
>>> org.eclipse.jetty.io.ssl.SslConnection$2.succeeded(SslConnection.java:146)
>>> at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:100)
>>> at 
>>> org.eclipse.jetty.io.SelectableChannelEndPoint$1.run(SelectableChannelEndPoint.java:53)
>>> at 
>>> org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.runTask(AdaptiveExecutionStrategy.java:421)
>>> at 
>>> org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.consumeTask(AdaptiveExecutionStrategy.java:390)
>>> at 
>>> org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.tryProduce(AdaptiveExecutionStrategy.java:277)
>>> at 
>>> org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.run(AdaptiveExecutionStrategy.java:199)
>>> at 
>>> org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:411)
>>> at 
>>> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:969)
>>> at 
>>> org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.doRunJob(QueuedThreadPool.java:1194)
>>> at 
>>> org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1149)
>>> at java.base/java.lang.Thread.run(Thread.java:1583)
>>>
>>>
>>> and from app.log:
>>>
>>> 2023-11-30 09:03:25,995 INFO [main] o.apache.nifi.controller.FlowController 
>>> Starting 0 Stateless Process Groups
>>> 2023-11-30 09:03:25,995 INFO [main] o.apache.nifi.controller.FlowController 
>>> Starting 0 processors/ports/funnels
>>> 2023-11-30 09:03:25,995 INFO [main] o.apache.nifi.controller.FlowController 
>>> Started 0 Remote Group Ports transmitting
>>> 2023-11-30 09:03:26,007 INFO [main] 
>>> 

Re: java.lang.OutOfMemoryError: Java heap space : NIFI 1.23.2

2024-03-19 Thread Matt Burgess
Specifically set Fetch Size to something like 1000, by default setting
Fetch Size to zero will cause Postgres to fetch the entire ResultSet into
memory [1]. We should probably change that default to avoid problems like
this and with other drivers (for example, Oracle's default is 10 rows which
is often inadequate).

Regards,
Matt

[1]
https://franckpachot.medium.com/oracle-postgres-jdbc-fetch-size-3012d494712


On Tue, Mar 19, 2024 at 9:56 AM Joe Witt  wrote:

> Hello
>
> The key output is
>
> java.lang.OutOfMemoryError: Java heap space
>
> Review batch property options to limit response sizes in the database
> calls.
>
> Thanks
>
>
> On Tue, Mar 19, 2024 at 6:15 AM  wrote:
>
>> Hello
>>
>> I got the executeSQL processor does the sql command "select * from
>> public.table1"
>> It is a postgresql database.
>>
>> Here the end of properties of processor.
>>
>> Max Wait Time 0 seconds
>> Normalize Table/Column Names false
>> Use Avro Logical Types false
>> Compression Format NONE
>> Default Decimal Precision 10
>> Default Decimal Scale 0
>> Max Rows Per Flow File 0
>> Output Batch Size 0
>> Fetch Size: 20
>> Set Auto Commit :false
>>
>>
>> The same sql command works in nifi 1.16.3 with the same configuration
>>
>> I don't know why it is failed. Thanks
>>
>> Need you help .. there is a strange error :
>>
>> 2024-03-19 12:58:37,683 ERROR [Load-Balanced Client Thread-6]
>> org.apache.nifi.engine.FlowEngine Uncaught Exception in Runnable task
>> java.lang.OutOfMemoryError: Java heap space
>> 2024-03-19 12:58:37,684 ERROR [Load-Balanced Client Thread-2]
>> org.apache.nifi.engine.FlowEngine Uncaught Exception in Runnable task
>> java.lang.OutOfMemoryError: Java heap space
>> at
>> java.base/java.util.concurrent.CopyOnWriteArrayList.iterator(CopyOnWriteArrayList.java:1024)
>> at
>> java.base/java.util.concurrent.CopyOnWriteArraySet.iterator(CopyOnWriteArraySet.java:389)
>> at
>> org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientTask.run(NioAsyncLoadBalanceClientTask.java:54)
>> at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
>> at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> at
>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> at
>> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> at java.base/java.lang.Thread.run(Thread.java:834)
>> 2024-03-19 12:58:48,940 INFO [main-EventThread]
>> o.a.c.f.state.ConnectionStateManager State change: SUSPENDED
>> 2024-03-19 12:58:49,347 ERROR [Timer-Driven Process Thread-9]
>> org.apache.nifi.engine.FlowEngine Uncaught Exception in Runnable task
>> java.lang.OutOfMemoryError: Java heap space
>> 2024-03-19 12:58:49,351 INFO [NiFi Web Server-5835]
>> org.apache.nifi.web.server.RequestLog 138.21.169.37 - CN=admin.plants,
>> OU=NIFI [19/Mar/2024:12:58:48 +] "GET /nifi-api/flow/cluster/summary
>> HTTP/1.1" 200 104 "
>> https://nifi-01:9091/nifi/?processGroupId=0f426c92-018e-1000--3fca1e11=;
>> "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML,
>> like Gecko) Chrome/105.0.0.0 Safari/537.36"
>> 2024-03-19 12:58:48,559 ERROR [Load-Balanced Client Thread-3]
>> org.apache.nifi.engine.FlowEngine Uncaught Exception in Runnable task
>> java.lang.OutOfMemoryError: Java heap space
>> 2024-03-19 12:58:49,745 INFO [NiFi Web Server-5419]
>> org.apache.nifi.web.server.RequestLog 138.21.169.37 - -
>> [19/Mar/2024:12:58:49 +] "POST
>> /rb_bf28073qyu?type=js3=v_4_srv_8_sn_B1E58A3741D2949DD454A88FF8A4BAF3_perc_10_ol_0_mul_1_app-3A4e195de4d0714591_1_app-3A44074a8878754fd3_1_app-3Ad1603d0792f56d4b_1_app-3A8092cfc902bb1761_1_rcs-3Acss_1=8=post=QKHPLOMPDHAQNPHMHHTUOHHKWJHFRNCG-0=1710851904127=https%3A%2F%2Fnifi-01%3A9091%2Fnifi%2F%3FprocessGroupId%3D0f426c92-018e-1000--3fca1e11%26componentIds%3D=3=d1603d0792f56d4b=1268897524=7xpdnw1j=1
>> HTTP/1.1" 200 109 "
>> https://nifi-01:9091/nifi/?processGroupId=0f426c92-018e-1000--3fca1e11=;
>> "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML,
>> like Gecko) Chrome/105.0.0.0 Safari/537.36"
>> 2024-03-19 12:58:57,419 ERROR [Timer-Driven Process Thread-5]
>> org.apache.nifi.engine.FlowEngine Uncaught Exception in Runnable task
>> java.lang.OutOfMemoryError: Java heap space
>> 2024-03-19 12:58:50,209 WARN [NiFi Web Server-5689]
>> o.a.n.c.l.e.CuratorLeaderElectionManager Unable to determine leader for
>> role 'Cluster Coordinator'; returning null
>> org.apache.zookeeper.KeeperException$ConnectionLossException:
>> KeeperErrorCode = ConnectionLoss for /nifi/clu_quality_2/leaders/Cluster
>> Coordinator
>> at
>> 

Re: java.lang.OutOfMemoryError: Java heap space : NIFI 1.23.2

2024-03-22 Thread Matt Burgess
For completeness, this can also affect the QueryDatabaseTable processors
[1]. This will be fixed in the next release(s).

Regards,
Matt

[1] https://issues.apache.org/jira/browse/NIFI-1931


On Tue, Mar 19, 2024 at 10:56 AM  wrote:

> Hello Joe,
>
> Thanks for the response.
> But I found the solution.
>
> Like the table size is around 7,6GB (avro) for 161000 rows
> And I setup Fetch size : 20 so the processor ExecuteSQL try to fetch
> all datas.
>
> Like the heap size is setup to 8GB
> I have reduced the "Fetch Size" to 1 and it is worked.
>
> Regards
>
>
> *Envoyé:* mardi 19 mars 2024 à 14:53
> *De:* "Joe Witt" 
> *À:* users@nifi.apache.org
> *Objet:* Re: java.lang.OutOfMemoryError: Java heap space : NIFI 1.23.2
> Hello
>
> The key output is
>
> java.lang.OutOfMemoryError: Java heap space
> Review batch property options to limit response sizes in the database
> calls.
>
> Thanks
>
>
> On Tue, Mar 19, 2024 at 6:15 AM  wrote:
>
>> Hello
>>
>> I got the executeSQL processor does the sql command "select * from
>> public.table1"
>> It is a postgresql database.
>>
>> Here the end of properties of processor.
>>
>> Max Wait Time 0 seconds
>> Normalize Table/Column Names false
>> Use Avro Logical Types false
>> Compression Format NONE
>> Default Decimal Precision 10
>> Default Decimal Scale 0
>> Max Rows Per Flow File 0
>> Output Batch Size 0
>> Fetch Size: 20
>> Set Auto Commit :false
>>
>>
>> The same sql command works in nifi 1.16.3 with the same configuration
>>
>> I don't know why it is failed. Thanks
>>
>> Need you help .. there is a strange error :
>>
>> 2024-03-19 12:58:37,683 ERROR [Load-Balanced Client Thread-6]
>> org.apache.nifi.engine.FlowEngine Uncaught Exception in Runnable task
>> java.lang.OutOfMemoryError: Java heap space
>> 2024-03-19 12:58:37,684 ERROR [Load-Balanced Client Thread-2]
>> org.apache.nifi.engine.FlowEngine Uncaught Exception in Runnable task
>> java.lang.OutOfMemoryError: Java heap space
>> at
>> java.base/java.util.concurrent.CopyOnWriteArrayList.iterator(CopyOnWriteArrayList.java:1024)
>> at
>> java.base/java.util.concurrent.CopyOnWriteArraySet.iterator(CopyOnWriteArraySet.java:389)
>> at
>> org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientTask.run(NioAsyncLoadBalanceClientTask.java:54)
>> at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
>> at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> at
>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> at
>> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> at java.base/java.lang.Thread.run(Thread.java:834)
>> 2024-03-19 12:58:48,940 INFO [main-EventThread]
>> o.a.c.f.state.ConnectionStateManager State change: SUSPENDED
>> 2024-03-19 12:58:49,347 ERROR [Timer-Driven Process Thread-9]
>> org.apache.nifi.engine.FlowEngine Uncaught Exception in Runnable task
>> java.lang.OutOfMemoryError: Java heap space
>> 2024-03-19 12:58:49,351 INFO [NiFi Web Server-5835]
>> org.apache.nifi.web.server.RequestLog 138.21.169.37 - CN=admin.plants,
>> OU=NIFI [19/Mar/2024:12:58:48 +] "GET /nifi-api/flow/cluster/summary
>> HTTP/1.1" 200 104 "
>> https://nifi-01:9091/nifi/?processGroupId=0f426c92-018e-1000--3fca1e11=;
>> "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML,
>> like Gecko) Chrome/105.0.0.0 Safari/537.36"
>> 2024-03-19 12:58:48,559 ERROR [Load-Balanced Client Thread-3]
>> org.apache.nifi.engine.FlowEngine Uncaught Exception in Runnable task
>> java.lang.OutOfMemoryError: Java heap space
>> 2024-03-19 12:58:49,745 INFO [NiFi Web Server-5419]
>> org.apache.nifi.web.server.RequestLog 138.21.169.37 - -
>> [19/Mar/2024:12:58:49 +] "POST
>> /rb_bf28073qyu?type=js3=v_4_srv_8_sn_B1E58A3741D2949DD454A88FF8A4BAF3_perc_10_ol_0_mul_1_app-3A4e195de4d0714591_1_app-3A44074a8878754fd3_1_app-3Ad1603d0792f56d4b_1_app-3A8092cfc902bb1761_1_rcs-3Acss_1=8=post=QKHPLOMPDHAQNPHMHHTUOHHKWJHFRNCG-0=1710851904127=https%3A%2F%2Fnifi-01%3A9091%2Fnifi%2F%3FprocessGroupId%3D0f426c92-018e-1000--3fca1e11%26componentIds%3D=3=d1603d0792f56d4b=1268897524=7xpdnw1j=1
>> HTTP/1.1" 200 109 "
>> https://nifi-01:9091/nifi/?processGroupId=0f426c92-018e-1000--3fca1e11=;
>> "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML,
>> like Gecko) Chrome/105.0.0.0 Safari/537.36"
>> 2024-03-19 12:58:57,419 ERROR [Timer-Driven Process Thread-5]
>> org.apache.nifi.engine.FlowEngine Uncaught Exception in Runnable task
>> java.lang.OutOfMemoryError: Java heap space
>> 2024-03-19 12:58:50,209 WARN [NiFi Web Server-5689]
>> 

<    1   2   3   4   5