Re: NiFi Use case

2020-03-14 Thread Emanuel Oliveira
Hi,

Please share processors you using, as its not clear how you implementing
what you saying in english:
- load two tables ? what does that mean? what processor you using to both
create/prepare data + what processor you using to "load" ?
- etc..

After we better understand, I can better help you.. but not its simply too
vague.

Best Regards,
*Emanuel Oliveira*



On Sat, Mar 14, 2020 at 8:27 AM Samarendra Sahoo 
wrote:

>
> Hello,
> We have use case where we have to load two tables say Customer (here
> customer ID is a sequence and gets generated while we load data) and
> purchase_order. While loading purchase_order need to populate customer_id
> based on SSN present in the purchase_order table. Since there is this
> dependency, trying to create this in one process group with Step1 - load
> customer, step2 - load purchase order with dummy customer_id, step 3 - join
> purchase_order and customer based on ssn and populate customer_id in
> purchase_order.
>
> While doing so, there are multiple flow files generated for customer table
> as we are loading this data based on partition. Would like to know, how to
> trigger next processor only once, when all flow files are processed by
> previous processor?
>
> Looking for help or if there are any better approaches to achieve this?
>
> Thanks
> Sam
>
>
>


Re: not possible to set PG variable if processor referencing it is running ?

2020-02-28 Thread Emanuel Oliveira
Hi Mark, thanks for your feedback.
We will assemble context detail, and will share here asap.

We using python script + nipyapi lib implementing a custom way to execute
nifi flows:
1. test in beginning if its running?
2. set pg var jobid
3. set others pg vars
4. execute pg
5. loop
get number threads + ff on queues
if error then exit error
if threads=0 and ffs=0 then exit OK

Next week going myself talk to coleaugue more aware of our py scripy to
understand exact sequence of nifi rest api calls.. hopefully its something
in our script.. but lets see.

Thanks,
Emanuel O.

On Thu 27 Feb 2020, 13:08 Mark Payne,  wrote:

> Emanuel,
>
> It sounds like a bug. When a variable is set, it should stop any
> processors currently referencing the variables, add the variable, and then
> restart the affected processors.
>
> Can you include the full stack trace? What version of NiFi? Are you
> changing the value of an existing variable or adding a new variable to the
> Process Group? A template of the flow would be helpful also, if that is
> something that you can provide.
>
> Thanks
> -Mark
>
> On Feb 26, 2020, at 6:14 AM, Oliveira, Emanuel 
> wrote:
>
> Hi all,
>
>
> We using shell script/python using nipyapi (client for NIFI REST API) to
> set:
> PG variable:
> JOBID=
>
> But we would better renaming things so our LogMessages prefix we use
> ${job_jobid} instead of PG var ${JOBID}:
> JOBID:${job_jobid}||
>
> Today for our surprise when we changed in python script to set a new PG
> var:
> job_jobid
>
> we got error:
> Cannot update variable 'job_jobid' because it is referenced by LogMessage
>
>
> Is this known/expected behaviour ? Im surprised enough to think this
> doesn’t make sense..
>
> As workaround we could:
> *PG variables to be set externally – inputs for th eflow:*
> FLOW_JOBID
> FLOW_BEGIN_DT
> (xx)
>
> *Add UpdateAttribute processor to create attributes as copies of the
> respective PG vars:*
> job_jobid= ${FLOW_JOBID}
> job_begin_dt= ${FLOW_BEGIN_DT}
>
>
>
> Thanks//Regards,
> *Emanuel Oliveira*
> Senior Data Engineer
>
>
>


Re: Suggestions for Flow Development Lifestyle

2020-02-25 Thread Emanuel Oliveira
i know that when we export xml as template, ids are not preserved (what we
have on canvas vs on generated temp[ate xml).
I wonder about ids from deployment from nifi registry are they preserved ?


Best Regards,
*Emanuel Oliveira*



On Tue, Feb 25, 2020 at 8:57 PM Kevin Doran  wrote:

> I’ve always thought along the lines Otto suggests, that eventually, given
> some way of formatting the diff, there would also be some visual tool in
> the ecosystem that would help visualize that diff and could be used
> specifically in the context of reviewing/merging changes.
>
> Lots of good discussion on this thread, thanks all!
>
> Kevin
>
> On Feb 25, 2020, at 15:44, Otto Fowler  wrote:
>
> -or- when diffing, some other representation could be presented other than
> straight xml.
>
> The diff ( if run / visualized in nifi ) needn’t be byte for byte, it can
> be logical can’t it?
>
> On February 25, 2020 at 14:58:24, Eric Secules (esecu...@gmail.com) wrote:
>
> Hi Ken,
>
>
>> I feel like there are also issues in trying to build a pull-request-like
>> workflow, in that most NiFi design is done visually on the canvas but there
>> isn't a good way, that I know of, to represent differences between versions
>> in a visual way. Even if you enabled pull request-like functionality, I
>> don't know that I would expect users to approve changes by examining diffs
>> in the XML files.
>>
>
> I agree, doing PR can be improved by starting up a nifi instance and
> registry on top of the feature branch, so you can view the changes in a
> safe environment before approving. But this doesn't solve the issue of
> merging diverged versions. For that we'd need a visual representation of a
> diff. Maybe entering a 3 way diff mode with side-by-side canvases and no
> active processors where the differences are highlighted. Doing merge
> resolution at the XML level is a non-starter for me.
>
> I've tried out restarting the registry which works for receiving new
> changes. I have also had some trouble with loosing previous version history
> when restarting a registry, but the main issue would still be merging
> diverged branches. Maybe it would help to have some checkout mechanism and
> return to the days of centralized version control if merging is too
> difficult to implement.
>
> -Eric
>
> On Tue, Feb 25, 2020 at 9:36 AM Ken Swanson  wrote:
>
>> I have a lot of interest in this too, as my team has run into these
>> issues as well.
>>
>> I feel like there are also issues in trying to build a pull-request-like
>> workflow, in that most NiFi design is done visually on the canvas but there
>> isn't a good way, that I know of, to represent differences between versions
>> in a visual way. Even if you enabled pull request-like functionality, I
>> don't know that I would expect users to approve changes by examining diffs
>> in the XML files.
>>
>> However, I did have one thing to add that I hope might help:
>> On Tue, Feb 25, 2020 at 12:32 AM Eric Secules  wrote:
>>
>>> I've also tried backing up my local registry to a separate branch in git
>>> and manually merging it with the branch that central-reg backs up to, but
>>> these git branches are glorified backups and the registry doesn't seem to
>>> be built to pull updates from them. On top of that doing a code review on
>>> the generated JSON describing a process group is difficult and I ran into
>>> several merge conflicts testing out a very simple merge where the target
>>> branch diverged slightly from the feature branch.
>>>
>>
>> From what I've seen, the registry will use any updates that are in the
>> repo when it starts up. So you should be able to do something like this:
>>
>>1. Make the registry pull (or clone) from the backup as part of its
>>startup process
>>2. Restart the registry when a merge to master is made. AFAIK this
>>should not affect any NiFi instances that are connected to the registry;
>>they should continue to work once the registry comes back up
>>
>> It's a bit clunky but I think it works.
>>
>> -Ken
>>
>
>


Re: upgrade flow with running components using state (the defaul provided by zookeeper)

2020-02-08 Thread Emanuel Oliveira
I see, so how easy can it be to migrate state of specific processors and
how ?
I guess using ZK CLI ? or grabbing state from the xml directly ?

Best Regards,
*Emanuel Oliveira*



On Sat, Feb 8, 2020 at 6:36 PM Bryan Bende  wrote:

> There is a ZK migrator in the toolkit that can transfer all state from one
> one ZK to another, for the scenario where you are moving everything to a
> new cluster.
>
> Other than that, it is not part of versioned flows because the state is
> specific to the environment.
>
> On Sat, Feb 8, 2020 at 12:41 PM Emanuel Oliveira 
> wrote:
>
>> Great, good to know thanks Bryan, going take a look to that ZK CLI surely.
>>
>> One last question, good to know deploy new version of PG flow via
>> Registry keeps the state of processors (link by their uuid).
>> And how about deploying the flow into another cluster B, but which
>> already been running on cluster A.. how to copy/move the state of a
>> processor from cluster A into cluster B ?
>>
>> Best Regards,
>> *Emanuel Oliveira*
>>
>>
>>
>> On Sat, Feb 8, 2020 at 4:59 PM Bryan Bende  wrote:
>>
>>> Yes with registry components are upgraded in place so their ids are not
>>> changing.
>>>
>>> The distributed cache was the old way of storing state in 0.x before the
>>> internal state manager was introduced. It is still there to migrate state
>>> in the event someone upgrades from and old 0.x release, but is not used
>>> otherwise and should be removed on 2.0.0.
>>>
>>> The state managers are defined in state-management.xml, the local one is
>>> a write ahead log and the clustered one is ZooKeeper by default. You could
>>> use ZK CLI to inspect what is stored.
>>>
>>> On Sat, Feb 8, 2020 at 4:33 AM Emanuel Oliveira 
>>> wrote:
>>>
>>>> Yes Bryan, we developed process to deploy from registry uding nifi rest
>>>> api.
>>>>
>>>> I see so state is physically related to processors uuid.
>>>> 1. when importing templates, the uuids change. so reading your
>>>> suggestion hi jts that deploying from registry the same PG same or newer
>>>> version (where our state processor remains de same) via rest API it shall
>>>> keep uuids in both deploys is it?
>>>>
>>>> 2. where do processor states get stored physically at cluster and at
>>>> locsl level? I suppose processors use internally the so called "zoo keeper"
>>>> to also maintain states ? Additionally are just "state" files get synced in
>>>> between nodes or are there nifi or zookeeper or some other type of apis
>>>> being used?
>>>>
>>>> 3. Yesterday we had a flow using ListHdfs + FetchHdfs+ PutS3 , with
>>>> ListHdfs using internal state management (that is property "Distributed
>>>> Cache Service" is not set, i think this means processor using default nifi
>>>> internal state system which is managed/implemented by zookeeper?).
>>>> Something strange happened that dedpite 1000's files got pulled/stored in
>>>> s3 but rightclicking ListHdfs state was empty.. there was no key/values on
>>>> the list.. the processor was been running for 2 days. Isnt supposed for us
>>>> to be able to inspect state? What could we do next time to troubleshoot
>>>> this?
>>>>
>>>>
>>>> Thanks in advance,
>>>> Emanuel O.
>>>>
>>>>
>>>>
>>>> On Fri 7 Feb 2020, 21:54 Bryan Bende,  wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> How are you upgrading the flow?
>>>>>
>>>>> If you mean using NiFi Registry and selecting Change Version to a new
>>>>> version, then yes it will retain state.
>>>>>
>>>>> Other than that, probably not because the state is tied to the UUID of
>>>>> the processor, so if you used templates or some other approach, you
>>>>> will likely get a new UUID for the processor in the new flow.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Bryan
>>>>>
>>>>> On Fri, Feb 7, 2020 at 4:44 PM Emanuel Oliveira 
>>>>> wrote:
>>>>> >
>>>>> > Hi,
>>>>> >
>>>>> > I wonder.. Is it possible to upgrade PG flow to new version when its
>>>>> contains processors using state ?
>>>>> > FYI The new flow using exact same processors/versions its just minor
>>>>> tweaks on some properties etc..
>>>>> >
>>>>> > Best Regards,
>>>>> > Emanuel Oliveira
>>>>> >
>>>>>
>>>> --
>>> Sent from Gmail Mobile
>>>
>> --
> Sent from Gmail Mobile
>


Re: upgrade flow with running components using state (the defaul provided by zookeeper)

2020-02-08 Thread Emanuel Oliveira
Great, good to know thanks Bryan, going take a look to that ZK CLI surely.

One last question, good to know deploy new version of PG flow via Registry
keeps the state of processors (link by their uuid).
And how about deploying the flow into another cluster B, but which already
been running on cluster A.. how to copy/move the state of a processor from
cluster A into cluster B ?

Best Regards,
*Emanuel Oliveira*



On Sat, Feb 8, 2020 at 4:59 PM Bryan Bende  wrote:

> Yes with registry components are upgraded in place so their ids are not
> changing.
>
> The distributed cache was the old way of storing state in 0.x before the
> internal state manager was introduced. It is still there to migrate state
> in the event someone upgrades from and old 0.x release, but is not used
> otherwise and should be removed on 2.0.0.
>
> The state managers are defined in state-management.xml, the local one is a
> write ahead log and the clustered one is ZooKeeper by default. You could
> use ZK CLI to inspect what is stored.
>
> On Sat, Feb 8, 2020 at 4:33 AM Emanuel Oliveira 
> wrote:
>
>> Yes Bryan, we developed process to deploy from registry uding nifi rest
>> api.
>>
>> I see so state is physically related to processors uuid.
>> 1. when importing templates, the uuids change. so reading your suggestion
>> hi jts that deploying from registry the same PG same or newer version
>> (where our state processor remains de same) via rest API it shall keep
>> uuids in both deploys is it?
>>
>> 2. where do processor states get stored physically at cluster and at
>> locsl level? I suppose processors use internally the so called "zoo keeper"
>> to also maintain states ? Additionally are just "state" files get synced in
>> between nodes or are there nifi or zookeeper or some other type of apis
>> being used?
>>
>> 3. Yesterday we had a flow using ListHdfs + FetchHdfs+ PutS3 , with
>> ListHdfs using internal state management (that is property "Distributed
>> Cache Service" is not set, i think this means processor using default nifi
>> internal state system which is managed/implemented by zookeeper?).
>> Something strange happened that dedpite 1000's files got pulled/stored in
>> s3 but rightclicking ListHdfs state was empty.. there was no key/values on
>> the list.. the processor was been running for 2 days. Isnt supposed for us
>> to be able to inspect state? What could we do next time to troubleshoot
>> this?
>>
>>
>> Thanks in advance,
>> Emanuel O.
>>
>>
>>
>> On Fri 7 Feb 2020, 21:54 Bryan Bende,  wrote:
>>
>>> Hello,
>>>
>>> How are you upgrading the flow?
>>>
>>> If you mean using NiFi Registry and selecting Change Version to a new
>>> version, then yes it will retain state.
>>>
>>> Other than that, probably not because the state is tied to the UUID of
>>> the processor, so if you used templates or some other approach, you
>>> will likely get a new UUID for the processor in the new flow.
>>>
>>> Thanks,
>>>
>>> Bryan
>>>
>>> On Fri, Feb 7, 2020 at 4:44 PM Emanuel Oliveira 
>>> wrote:
>>> >
>>> > Hi,
>>> >
>>> > I wonder.. Is it possible to upgrade PG flow to new version when its
>>> contains processors using state ?
>>> > FYI The new flow using exact same processors/versions its just minor
>>> tweaks on some properties etc..
>>> >
>>> > Best Regards,
>>> > Emanuel Oliveira
>>> >
>>>
>> --
> Sent from Gmail Mobile
>


Re: upgrade flow with running components using state (the defaul provided by zookeeper)

2020-02-08 Thread Emanuel Oliveira
Yes Bryan, we developed process to deploy from registry uding nifi rest api.

I see so state is physically related to processors uuid.
1. when importing templates, the uuids change. so reading your suggestion
hi jts that deploying from registry the same PG same or newer version
(where our state processor remains de same) via rest API it shall keep
uuids in both deploys is it?

2. where do processor states get stored physically at cluster and at locsl
level? I suppose processors use internally the so called "zoo keeper" to
also maintain states ? Additionally are just "state" files get synced in
between nodes or are there nifi or zookeeper or some other type of apis
being used?

3. Yesterday we had a flow using ListHdfs + FetchHdfs+ PutS3 , with
ListHdfs using internal state management (that is property "Distributed
Cache Service" is not set, i think this means processor using default nifi
internal state system which is managed/implemented by zookeeper?).
Something strange happened that dedpite 1000's files got pulled/stored in
s3 but rightclicking ListHdfs state was empty.. there was no key/values on
the list.. the processor was been running for 2 days. Isnt supposed for us
to be able to inspect state? What could we do next time to troubleshoot
this?


Thanks in advance,
Emanuel O.


On Fri 7 Feb 2020, 21:54 Bryan Bende,  wrote:

> Hello,
>
> How are you upgrading the flow?
>
> If you mean using NiFi Registry and selecting Change Version to a new
> version, then yes it will retain state.
>
> Other than that, probably not because the state is tied to the UUID of
> the processor, so if you used templates or some other approach, you
> will likely get a new UUID for the processor in the new flow.
>
> Thanks,
>
> Bryan
>
> On Fri, Feb 7, 2020 at 4:44 PM Emanuel Oliveira 
> wrote:
> >
> > Hi,
> >
> > I wonder.. Is it possible to upgrade PG flow to new version when its
> contains processors using state ?
> > FYI The new flow using exact same processors/versions its just minor
> tweaks on some properties etc..
> >
> > Best Regards,
> > Emanuel Oliveira
> >
>


Re: parsing html table

2020-02-08 Thread Emanuel Oliveira
Hi,

A simple processor
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.5.0/org.apache.nifi.processors.standard.ReplaceText/
you can find(regex) and then replace(regex), meaning you have all the power
of regex at your disposal to modify ff contents.

For example, you could simply:
search(regex)= (.*)
replace(regex)= \1,

Im writing this while taking breakfast, so you may want to test, but
basically as you may know each pair of parenthesis in a regex its a regex
group where one can refer to that specific contents by \1 for 1st set, \2
for second set etc (left to right).

So limit of replace(regex) is your imagination, you can mimic csv, or
whatever format you want. The comma on replace (regex) simple adds a comma
so your html file becomes magically a list of values :)

Hope this helps,

Emanuel O.

On Sat 8 Feb 2020, 01:03 Youzha,  wrote:

> Hi is there any way to parsing html table and insert it into mysql?
> i want to put  value to my sql table.
> please suggest
>


upgrade flow with running components using state (the defaul provided by zookeeper)

2020-02-07 Thread Emanuel Oliveira
Hi,

I wonder.. Is it possible to upgrade PG flow to new version when its
contains processors using state ?
FYI The new flow using exact same processors/versions its just minor tweaks
on some properties etc..

Best Regards,
*Emanuel Oliveira*


Re: Execute Script from Configuration in a Generic Flow

2020-02-05 Thread Emanuel Oliveira
Hi,

processor ReplaceText (.*) with ${yourattribute} - replaces FF content with
expression  (example 1 attribute)
But looking into ExecuteScript documentation
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-scripting-nar/1.5.0/org.apache.nifi.processors.script.ExecuteScript/index.html
Seems only "Script File" and "Module Directory" supports Expression
Language: true (aka use of attributes etc).
Sadly "Script Body" doesnt support  Expression Language, meaning you cant
load script body from ${yourattribute}.

You may try the workaround of saving script as local file and then set both
directory + script file dynamically as they support expression.

Hope this helps,
Emanuel O.




Best Regards,
*Emanuel Oliveira*



On Wed, Feb 5, 2020 at 7:15 PM Anurag Sharma 
wrote:

> Hi,
>
>
> We are trying to create a generic data flow where configuration will be
> received as JSON. Configuration will have a JavaScript to be executed.
>
>
>
> We are stuck on how to execute the script we received in configuration, as
> the ExecuteScript processor needs either scriptPath or scriptBody. But in
> our case script will be in flowFile's attribute.
>
>
>
> What are all possible ways to achieve this functionality?
>
>
>
> Along with this, the script should not be able to access the 'session' and
> 'context' variables available in the ExecuteScript processor. So how to
> sanitise the script before execution?
>
>
> Regards
>
> Anurag
>


Re: Validating CSV File

2020-01-07 Thread Emanuel Oliveira
ValidateCsv is the most robust (handles missing fields as your need), it
doesn't use Avro Schemas, instead use inline sequence of functions to
accomplish anything you want (nulls ok or not, types, regex etc).

In recent project while struggling for maximum data quality i tried all
different processors and options and ValidateCsv is the clear winner for
CSVs.

Emanuel O.

On Mon 6 Jan 2020, 23:36 Matt Burgess,  wrote:

> What about ValidateCsv, could that do what you want?
>
> Sent from my iPhone
>
> On Jan 6, 2020, at 6:10 PM, Shawn Weeks  wrote:
>
> 
>
> I’m poking around to see if I can make the csv parsers fail on a schema
> mismatch like that. A stream command would be a good option though.
>
>
>
> Thanks
>
> Shawn
>
>
>
> *From: *Mike Thomsen 
> *Reply-To: *"users@nifi.apache.org" 
> *Date: *Monday, January 6, 2020 at 4:35 PM
> *To: *"users@nifi.apache.org" 
> *Subject: *Re: Validating CSV File
>
>
>
> We have a lot of the same issues where I work, and our solution is to use
> ExecuteStreamCommand to pass CSVs off to Python scripts that will read
> stdin line by line to check to see if the export isn't screwed up. Some of
> our sources are good and we don't have to do that, but others are
> minefields in terms of the quality of the upstream data source, and that's
> the only way we've found where we can predictably handle such things.
>
>
>
> On Mon, Jan 6, 2020 at 4:57 PM Shawn Weeks 
> wrote:
>
> That's the challenge, the values can be null but I want to know the fields
> are missing(aka not enough delimiters). I run into a common scenario where
> line feeds end up in the data making a short row. Currently the reader just
> ignores the fact that there aren't enough delimiters and makes them null.
>
> On 1/6/20, 3:50 PM, "Matt Burgess"  wrote:
>
> Shawn,
>
> Your schema indicates that the fields are optional because of the
> "type" :  ["null", "string"] , so IIRC they won't be marked as invalid
> because they are treated as null (I'm not sure there's a difference in
> the code between missing and null fields).
>
> You can try "type": "string" in ValidateRecord to see if that fixes
> it, or there's a "StrNotNullOrEmpty" operator in ValidateCSV.
>
> Regards,
> Matt
>
> On Mon, Jan 6, 2020 at 4:35 PM Shawn Weeks 
> wrote:
> >
> > I’m trying to validate that a csv file has the number of fields
> defined in it’s Avro schema. Consider the following schema and CSVs. I
> would like to be able to reject the invalid csv as missing fields.
> >
> >
> >
> > {
> >
> >"type" : "record",
> >
> >"namespace" : "nifi",
> >
> >"name" : "nifi",
> >
> >"fields" : [
> >
> >   { "name" : "c1" , "type" :  ["null", "string"] },
> >
> >   { "name" : "c2" , "type" : ["null", "string"] },
> >
> >   { "name" : "c3" , "type" : ["null", "string"] }
> >
> >]
> >
> > }
> >
> >
> >
> > Good CSV
> >
> > c1,c2,c3
> >
> > hello,world,1
> >
> > hello,world,
> >
> > hello,,
> >
> >
> >
> > Bad CSV
> >
> > c1,c2,c3
> >
> > hello,world,1
> >
> > hello,world
> >
> > hello
> >
> >
>
>


Re: NiFi ValidateRecord - unable to handle missing mandatory ARRAY ?

2020-01-06 Thread Emanuel Oliveira
Thanks Pierre!

On Mon 6 Jan 2020, 17:06 Pierre Villard, 
wrote:

> Hi Emanuel,
>
> The PR is currently under review so that would not be included in NiFi
> 1.10.0 (which is already released). We recently discussed about releasing a
> new NiFi version (1.10.1 or 1.11.0) and if the PR is merged before such a
> release, it would certainly be included in that version.
>
> Hope it makes sense,
> Pierre
>
>
> Le lun. 6 janv. 2020 à 22:08, Oliveira, Emanuel 
> a écrit :
>
>> Thanks Matt and Mark!
>> We still on version
>> 1.8.0
>> 10/22/2018 23:48:30 EDT
>> Tagged nifi-1.8.0-RC3
>>
>> Current version is 1.10
>>
>> As curiosity, when could we expected this fix to be available ? Would it
>> mean we upgrade to 1.10 ? Thanks.
>>
>> Thanks//Regards,
>> Emanuel Oliveira
>>
>>
>>
>> -Original Message-
>> From: Matt Burgess 
>> Sent: Friday 20 December 2019 17:52
>> To: users@nifi.apache.org
>> Subject: Re: NiFi ValidateRecord - unable to handle missing mandatory
>> ARRAY ?
>>
>> This email is from an external source - exercise caution regarding links
>> and attachments.
>>
>>
>> Mark is spot-on with the diagnosis, a default empty array is being
>> created for the missing field even if no default value is specified in the
>> schema. All it needs is an extra null check in order to return null as the
>> default value, then the record is marked invalid as expected.
>>
>> I have written up NIFI-6963 [1] to cover this, and issued a PR to fix it
>> [2]. Mark, would you kindly do the honors of a review? Please and thanks!
>>
>> -Matt
>>
>> [1] https://issues.apache.org/jira/browse/NIFI-6963
>> [2] https://github.com/apache/nifi/pull/3948
>>
>> On Wed, Dec 11, 2019 at 10:25 AM Mark Payne  wrote:
>> >
>> > Emanuel,
>> >
>> > I looked into this a week or so ago, but haven't had a chance to
>> resolve the issue yet. It does appear to be a bug. Specifically, I believe
>> the bug is here [1].  When we create a RecordSchema from the Avro Schema,
>> we set the default value for the array to an empty array, instead of null.
>> Because of this, when the JSON is parsed, we end up creating a Record with
>> an empty array for the "Record" field instead of a null. As as result, the
>> Record is considered valid because it does have an array (it's just empty).
>> I think it *should* be a null value instead.
>> >
>> > It looks like this was introduced in NIFI-4893 [2]. We can easily
>> change it to just return a null value for the default, but that does result
>> in two of the unit tests added in NIFI-4893 failing. It may be that those
>> unit tests need to be fixed, or it may be that such a change does break
>> something. I just haven't had a chance yet to dig that far into it.
>> >
>> > If you're someone who is comfortable digging into the code and making
>> the updates, then please do and I'm happy to review a PR as soon as I'm
>> able.
>> >
>> > Thanks
>> > -Mark
>> >
>> >
>> > [1]
>> > https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-exten
>> > sion-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/
>> > apache/nifi/avro/AvroTypeUtil.java#L629-L631
>> >
>> > [2] https://issues.apache.org/jira/browse/NIFI-4893
>> >
>> >
>> >
>> > On Dec 11, 2019, at 8:02 AM, Oliveira, Emanuel <
>> emanuel.olive...@fmr.com> wrote:
>> >
>> > Anyway knowledgably on avro schemas can please confirm/suggest if this
>> inability to invalidate json payload missing array in root when allowing
>> extra field-true is normal ?
>> >
>> > There’s 2 options with:
>> >
>> > ValidateRecord.Allow Extra Fields=false à need to supply full schema
>> > ValidateRecord.Allow Extra Fields=true à this is what I been
>> testing/want, a way to supply schema with only mandatory fields.
>> >
>> >
>> > I want 2 mandatory fields, an array with at least 1 element having
>> eventVersion, so minimal json should be:
>> > { (..)
>> >"Records": [{
>> >  "eventVersion": "aaa"
>> >  (..)
>> >   }
>> >]
>> >(..)
>> > }
>> >
>> > Problem is ValidateRecord considers FF valid if missing “Records” array
>> in the root
>> > {
>> >"Service": &

Re: Need help with DetectDuplicate

2019-12-24 Thread Emanuel Oliveira
Hi,

Depending on how your cluster setup you may need to add/setup ssl
controller service?

Emanuel

On Tue 24 Dec 2019, 18:16 William Gosse, 
wrote:

> I’m trying to use DetectDuplicate processor but not having much luck. Here
> the config:
>
> Cache Entry Identifier
>
> ${resourceId}
>
> FlowFile Description
>
> Ingestion
>
> Age Off Duration
>
> 60 sec
>
> Cache The Entry Identifier
>
> true
>
> Distributed Cache Service
>
> DistributedMapCacheClientService
>
>
>
> I created abd enabled a DistributedMapCacheClientService. Here’s its
> config:
>
> Server Hostname
>
> localhost
>
> Server Port
>
> 4557
>
> SSL Context ServiceNo value setCommunications Timeout
>
> 30 secs
>
>
>
> When I run it I get the following error:
>
> 2019-12-24 13:14:05,355 ERROR [Timer-Driven Process Thread-9]
> o.a.n.p.standard.DetectDuplicate
> DetectDuplicate[id=38cb8a64-016f-1000-b55b-f6c4e0f69f61] Unable to
> communicate with cache when processing
> StandardFlowFileRecord[uuid=7f049d8e-1d04-4fee-9f04-320d6980bc55,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1577202782598-43,
> container=default, section=43], offset=74111,
> length=4528],offset=0,name=84068ffb-69b1-4471-abbd-29243d3be39e,size=4528]
> due to java.net.ConnectException: Connection refused: no further
> information: java.net.ConnectException: Connection refused: no further
> information
>
> java.net.ConnectException: Connection refused: no further information
>
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>
> at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:111)
>
> at
> org.apache.nifi.distributed.cache.client.StandardCommsSession.(StandardCommsSession.java:52)
>
> at
> org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService.createCommsSession(DistributedMapCacheClientService.java:410)
>
> at
> org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService.leaseCommsSession(DistributedMapCacheClientService.java:425)
>
> at
> org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService.withCommsSession(DistributedMapCacheClientService.java:491)
>
> at
> org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService.getAndPutIfAbsent(DistributedMapCacheClientService.java:174)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:87)
>
> at com.sun.proxy.$Proxy142.getAndPutIfAbsent(Unknown
> Source)
>
> at
> org.apache.nifi.processors.standard.DetectDuplicate.onTrigger(DetectDuplicate.java:183)
>
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
>
> at
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
>
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
>
> at
> org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
>
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
>
>
>
> Not sure whats missing?
>


wait/notify - serial processing (gate flipflop)

2019-12-24 Thread Emanuel Oliveira
Hi,

I been testing wait/notify pattern specially on a process one-by-one
pattern (an http rest API can only process 2nd request only after previous
completed).

1. i managed to have it work using technique i found here
https://community.cloudera.com/t5/Community-Articles/Trigger-based-Serial-Data-processing-in-NiFi-using-Wait-and/ta-p/248308
Basically it tags each ff via statefull UpdateAttribute and then creates a
distributedcache key "Release Signal Identifier" with this token (1, 2, ..)
and works fine. Wait removes key after consuming/releasing it.
2. but i was unable to understand - i guess - how to implement it uding
tooltip suggestion of special values zero(0) or one(1) on both
wait.Releasable FlowFile Count and notify.x which seems suggestion its
possible to implement exactly what i want (a flip-flop gate,. where some
processing happens serial way one at a time.. but i wasn't able to have
this work. It seems both wait/notify Release Signal Identifier would/can be
a static value (doesn't matter) as the magic values of zero(close gate) or
one(open gate) would be enough... but wasn't able to have this work. Any
suggestion please how to have a flip-flop or Singleton or serial processing
pattern using the magic values and just 1 Release Signal Identifier?

wait.Releasable FlowFile Count
"(..)Zero (0) has a special meaning, any number of FlowFiles can be
released as long as signal count matches target.(..)"

notify.Signal Counter Delta
"(..)Zero (0) has a special meaning, it clears target count back to 0,
which is especially useful when used with Wait Releasable FlowFile Count =
Zero (0) mode, to provide 'open-close-gate' type of flow control. One (1)
can open a corresponding Wait processor, and Zero (0) can negate it as if
closing a gate.(..)".

Ideally would be great if someone could share a minimalistic xml template
just demo this capacity suggested by both wait/notify processors
documentation.


Thanks,
Emanuel Oliveira


Wait - unable to feed-in expired FF

2019-12-24 Thread Emanuel Oliveira
Hi,

Im now enjoying 2 weeks xmas/new year holidays, but i spend a week testing
at deep detail the Wait/Notify patterns, trying different combinations of
property values and then use the FetchDistributedMapCache to see internally
how key values change over time.

I found out during testing that an expired ff exiting Wait from its expired
relationship into a logmessage connected back into the same Wait again
and... "strangely" the ff exited straight away (without the 10mins
expiration defined on the Wait) again to the Expired relationship.

So seems Wait is somehow storing expired ff on the cache or somewhere else?

Thanks for clarification,
Emanuel Oliveira


Re: NiFi ValidateRecord - unable to handle missing mandatory ARRAY ?

2019-12-19 Thread Emanuel Oliveira
Just additional thought on this, Im not sure if part of avro schema
specification, but would be nice to be able to "inform" on the schema of
cardinalities.
For example by default specified records or fields must exist (cardinality
1..1), but in arrays, would be nice to be able to specify cardinality like:
- 0..n -- can be empty (in this case either tag array must exist or not tbd
).
- 1..n  -- at least 1 element needed
- 1 and only element on the array (ie. [0]).

Best Regards,
*Emanuel Oliveira*



On Thu, Dec 12, 2019 at 11:23 AM Oliveira, Emanuel 
wrote:

> Hi Juan and others,
>
>
>
> Attaching reproducible test flow for your convenience.
>
>
>
> Once again objective is to have 2 mandatory things on json:
>
>- 1 array “Records” in the root.
>- and each element must have attribute eventVersion.
>
>
>
> Theres 3 generateFlowfiles to test the 3 different scenarios:
>
>- problem | missing array and FF still validates.
>- Ok | array “Records” present but missing eventVersion. Invalid as
>expected.
>- Ok | both mandatory things present array “Records” + “eventVersion”.
>
>
>
>
>
>
>
> Thanks//Regards,
>
> *Emanuel Oliveira*
>
> Senior Oracle/Data Engineer | CTG | Galway
> TEL ext: 353 – (0)91-74  4971 | int: 8-737 4971 *|*  who's who
> <http://fidelitycentral.fmr.com/ww/a639704>
>
>
>
> *From:* Juan Pablo Gardella 
> *Sent:* Wednesday 11 December 2019 16:18
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi ValidateRecord - unable to handle missing mandatory
> ARRAY ?
>
>
>
> *This email is from an external source - **exercise caution regarding
> links and attachments. *
>
>
>
> The bug https://issues.apache.org/jira/browse/NIFI-4893 was detected by
> myself. Do you have a reproducible flow to validate it?
>
>
>
> On Wed, 11 Dec 2019 at 12:54, Oliveira, Emanuel 
> wrote:
>
> Oh I see, makes, sense your analysis, but sorry I have done java 20 years
> ago, nowadays im mostly data engineer (oracle db, etl tools, custom
> migrations, snowflake and lately nifi).. so count on me to detect
> opportunities to improve things, but not able to change base code/tests.
>
>
>
> Thanks so much for your time and analysis, lets wait for community to step
> up to do the fix and update/run the unit tests 😊
>
>
>
> Thanks//Regards,
>
> *Emanuel Oliveira*
>
> Senior Oracle/Data Engineer | CTG | Galway
> TEL ext: 353 – (0)91-74  4971 | int: 8-737 4971 *|*  who's who
> <http://fidelitycentral.fmr.com/ww/a639704>
>
>
>
> *From:* Mark Payne 
> *Sent:* Wednesday 11 December 2019 15:25
> *To:* users@nifi.apache.org
> *Subject:* Re: NiFi ValidateRecord - unable to handle missing mandatory
> ARRAY ?
>
>
>
> *This email is from an external source - **exercise caution regarding
> links and attachments. *
>
>
>
> Emanuel,
>
>
>
> I looked into this a week or so ago, but haven't had a chance to resolve
> the issue yet. It does appear to be a bug. Specifically, I believe the bug
> is here [1].  When we create a RecordSchema from the Avro Schema, we set
> the default value for the array to an empty array, instead of null. Because
> of this, when the JSON is parsed, we end up creating a Record with an empty
> array for the "Record" field instead of a null. As as result, the Record is
> considered valid because it does have an array (it's just empty). I think
> it *should* be a null value instead.
>
>
>
> It looks like this was introduced in NIFI-4893 [2]. We can easily change
> it to just return a null value for the default, but that does result in two
> of the unit tests added in NIFI-4893 failing. It may be that those unit
> tests need to be fixed, or it may be that such a change does break
> something. I just haven't had a chance yet to dig that far into it.
>
>
>
> If you're someone who is comfortable digging into the code and making the
> updates, then please do and I'm happy to review a PR as soon as I'm able.
>
>
>
> Thanks
>
> -Mark
>
>
>
>
>
> [1]
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java#L629-L631
>
>
>
> [2] https://issues.apache.org/jira/browse/NIFI-4893
>
>
>
>
>
>
>
> On Dec 11, 2019, at 8:02 AM, Oliveira, Emanuel 
> wrote:
>
>
>
> Anyway knowledgably on avro schemas can please confirm/suggest if this
> inability to invalidate json payload missing array in root when allowing
> extra field-true is normal ?
>
>
>
> There’s 2 options with:
>

Re: NiFi ValidateRecord - unable to handle missing mandatory ARRAY ?

2019-12-05 Thread Emanuel Oliveira
Records" ARRAY +
> "eventVersion" on each ARRAY element
> Schema Access Strategy  Use 'Schema Text' Property
> Schema Registry
> Schema Name ${schema.name}
> Schema Version
> Schema Branch
> Schema Text
> {
>"name": "MyName",
>"type": "record",
>"namespace": "aa.bb.cc",
>"fields": [{
>  "name": "Records",
>  "type": {
> "type": "array",
> "items": {
>"name":
> "Records_record",
>"type": "record",
>"fields": [{
>      "name":
> "eventVersion",
>  "type":
> "string"
>   }
>]
> }
>  }
>   }
>]
> }
> Date Format
> Time Format
> Timestamp Format
>
> --JsonRecordSetWriter 1.8.0
> Schema Write Strategy   Do Not Write Schema
> Schema Access Strategy  Inherit Record Schema
> Schema Registry
> Schema Name ${schema.name}
> Schema Version
> Schema Branch
> Schema Text { "name": "eventVersion", "type":
> "string" }
> Date Format
> Time Format
> Timestamp Format
> Pretty Print JSON   true
> Suppress Null ValuesNever Suppress
> Output Grouping Array
>
> Thanks in advance,
> Emanuel Oliveira
>
> 
>
>
>