Re: Removing duplicates from data

2017-09-18 Thread Vikram More
Could not find 'PutDatabaseRecord' in the NiFi version : 1.1.0.2.1.2.0-10 I
am using . Please suggest ?

On Tue, Sep 19, 2017 at 12:10 AM, Vikram More 
wrote:

> Hi Koji,
> Thanks for response and helpful links !
>
> NiFi version : 1.1.0.2.1.2.0-10
>
> I am trying to move data from operational system (oracle db) to analytical
> system (postgres db). Postgres table has been model/designed by us (and can
> add primary key). Data from oracle looks like below  (i need to remove
> duplicate record for combination on ColA , ColB)
>
> Col A Col B
> C1 item 1
> C1 item 2
> *C2* *item 3*
> *C2* *item 4*
> *C2* *item 3*
> C3 item 1
> C4 null
> C5 item 5
> C6 item 7
> I will try to explore PutDatabaseRecord processor and see i can achieve
> desired purpose.
>
> Thanks,
> Vikram
>
> On Mon, Sep 18, 2017 at 9:59 PM, Koji Kawamura 
> wrote:
>
>> Hello Vikram,
>>
>> Welcome to NiFi and the community :)
>>
>> Would you elaborate your data flow? And which version you are using?
>> For example, can you share some input data extracted from Oracle? I
>> wonder why you need to remove duplicate records while PostgreSQL
>> doesn't have primary key constraint, or why you have such records in
>> the beginning.
>>
>> Current PutSQL does not report the cause of batch update failure well.
>> But that behavior has been improved and you can see what is the cause
>> if you can use NiFi 1.4.0-SNAPSHOT (you need to build NiFi from source
>> code to try it).
>> https://issues.apache.org/jira/browse/NIFI-4162
>>
>> Please refer NiFi README.md for how to build and run NiFi from source
>> code.
>> https://github.com/apache/nifi
>>
>> Also, in order to put Avro data to an RDBMS, NiFi also has
>> PutDatabaseRecord processor today. Which can work more efficiently
>> because you don't have to use 'split avro -> avrotojson -> jsontosql'
>> part, PutDatabaseRecord can directly execute DML statement from Avro
>> dataset.
>> https://nifi.apache.org/docs/nifi-docs/components/org.apache
>> .nifi/nifi-standard-nar/1.3.0/org.apache.nifi.processors.
>> standard.PutDatabaseRecord/index.html
>>
>> Thanks,
>> Koji
>>
>> On Tue, Sep 19, 2017 at 9:21 AM, Vikram More 
>> wrote:
>> > Hi Everyone,
>> >
>> > I am new to NiFi and community :)
>> >
>> > I am trying to build a Nifi flow which will pull from Oracle table and
>> load
>> > into Postgres table. My select query has two columns and I need to
>> remove
>> > duplicates based on these two columns. Can I remove duplicates in Nifi
>> based
>> > on two column data values. My flow is like below -
>> > ExecuteSQL -> split avro -> avrotojson -> jsontosql -> PutSQL
>> >
>> >
>> > PutSQL question : Oracle table has ~ 4 million records and when the
>> PutSQL
>> > was running , it gave several similar errors :
>> >
>> > "Failed to update database due to failed batch update. There were total
>> of 1
>> > FlowFiles that failed, 5 that successful, and 9 that were not execute
>> and
>> > will be routed to retry"
>> >
>> > Why might be wrong in PutSQL ? have kept PutSQL batch size of 1000 and
>> don't
>> > have any primary key constraint on postgres table.
>> > (Should I create primary key with those two columns, so while loading
>> it can
>> > reject duplicate records, but will it rejects the complete batch rather
>> than
>> > just duplicates ?)
>> >
>> > Would be great if someone can provide insight in this scenario ?
>> >
>> > Thanks,
>> > Vikram
>>
>
>


Re: Removing duplicates from data

2017-09-18 Thread Vikram More
Hi Koji,
Thanks for response and helpful links !

NiFi version : 1.1.0.2.1.2.0-10

I am trying to move data from operational system (oracle db) to analytical
system (postgres db). Postgres table has been model/designed by us (and can
add primary key). Data from oracle looks like below  (i need to remove
duplicate record for combination on ColA , ColB)

Col A Col B
C1 item 1
C1 item 2
*C2* *item 3*
*C2* *item 4*
*C2* *item 3*
C3 item 1
C4 null
C5 item 5
C6 item 7
I will try to explore PutDatabaseRecord processor and see i can achieve
desired purpose.

Thanks,
Vikram

On Mon, Sep 18, 2017 at 9:59 PM, Koji Kawamura 
wrote:

> Hello Vikram,
>
> Welcome to NiFi and the community :)
>
> Would you elaborate your data flow? And which version you are using?
> For example, can you share some input data extracted from Oracle? I
> wonder why you need to remove duplicate records while PostgreSQL
> doesn't have primary key constraint, or why you have such records in
> the beginning.
>
> Current PutSQL does not report the cause of batch update failure well.
> But that behavior has been improved and you can see what is the cause
> if you can use NiFi 1.4.0-SNAPSHOT (you need to build NiFi from source
> code to try it).
> https://issues.apache.org/jira/browse/NIFI-4162
>
> Please refer NiFi README.md for how to build and run NiFi from source code.
> https://github.com/apache/nifi
>
> Also, in order to put Avro data to an RDBMS, NiFi also has
> PutDatabaseRecord processor today. Which can work more efficiently
> because you don't have to use 'split avro -> avrotojson -> jsontosql'
> part, PutDatabaseRecord can directly execute DML statement from Avro
> dataset.
> https://nifi.apache.org/docs/nifi-docs/components/org.
> apache.nifi/nifi-standard-nar/1.3.0/org.apache.nifi.processors.standard.
> PutDatabaseRecord/index.html
>
> Thanks,
> Koji
>
> On Tue, Sep 19, 2017 at 9:21 AM, Vikram More 
> wrote:
> > Hi Everyone,
> >
> > I am new to NiFi and community :)
> >
> > I am trying to build a Nifi flow which will pull from Oracle table and
> load
> > into Postgres table. My select query has two columns and I need to remove
> > duplicates based on these two columns. Can I remove duplicates in Nifi
> based
> > on two column data values. My flow is like below -
> > ExecuteSQL -> split avro -> avrotojson -> jsontosql -> PutSQL
> >
> >
> > PutSQL question : Oracle table has ~ 4 million records and when the
> PutSQL
> > was running , it gave several similar errors :
> >
> > "Failed to update database due to failed batch update. There were total
> of 1
> > FlowFiles that failed, 5 that successful, and 9 that were not execute and
> > will be routed to retry"
> >
> > Why might be wrong in PutSQL ? have kept PutSQL batch size of 1000 and
> don't
> > have any primary key constraint on postgres table.
> > (Should I create primary key with those two columns, so while loading it
> can
> > reject duplicate records, but will it rejects the complete batch rather
> than
> > just duplicates ?)
> >
> > Would be great if someone can provide insight in this scenario ?
> >
> > Thanks,
> > Vikram
>


Re: Removing duplicates from data

2017-09-18 Thread Koji Kawamura
Hello Vikram,

Welcome to NiFi and the community :)

Would you elaborate your data flow? And which version you are using?
For example, can you share some input data extracted from Oracle? I
wonder why you need to remove duplicate records while PostgreSQL
doesn't have primary key constraint, or why you have such records in
the beginning.

Current PutSQL does not report the cause of batch update failure well.
But that behavior has been improved and you can see what is the cause
if you can use NiFi 1.4.0-SNAPSHOT (you need to build NiFi from source
code to try it).
https://issues.apache.org/jira/browse/NIFI-4162

Please refer NiFi README.md for how to build and run NiFi from source code.
https://github.com/apache/nifi

Also, in order to put Avro data to an RDBMS, NiFi also has
PutDatabaseRecord processor today. Which can work more efficiently
because you don't have to use 'split avro -> avrotojson -> jsontosql'
part, PutDatabaseRecord can directly execute DML statement from Avro
dataset.
https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.3.0/org.apache.nifi.processors.standard.PutDatabaseRecord/index.html

Thanks,
Koji

On Tue, Sep 19, 2017 at 9:21 AM, Vikram More  wrote:
> Hi Everyone,
>
> I am new to NiFi and community :)
>
> I am trying to build a Nifi flow which will pull from Oracle table and load
> into Postgres table. My select query has two columns and I need to remove
> duplicates based on these two columns. Can I remove duplicates in Nifi based
> on two column data values. My flow is like below -
> ExecuteSQL -> split avro -> avrotojson -> jsontosql -> PutSQL
>
>
> PutSQL question : Oracle table has ~ 4 million records and when the PutSQL
> was running , it gave several similar errors :
>
> "Failed to update database due to failed batch update. There were total of 1
> FlowFiles that failed, 5 that successful, and 9 that were not execute and
> will be routed to retry"
>
> Why might be wrong in PutSQL ? have kept PutSQL batch size of 1000 and don't
> have any primary key constraint on postgres table.
> (Should I create primary key with those two columns, so while loading it can
> reject duplicate records, but will it rejects the complete batch rather than
> just duplicates ?)
>
> Would be great if someone can provide insight in this scenario ?
>
> Thanks,
> Vikram


Removing duplicates from data

2017-09-18 Thread Vikram More
Hi Everyone,

I am new to NiFi and community :)

I am trying to build a Nifi flow which will pull from Oracle table and load
into Postgres table. My select query has two columns and I need to remove
duplicates based on these two columns. Can I remove duplicates in Nifi
based on two column data values. My flow is like below -
ExecuteSQL -> split avro -> avrotojson -> jsontosql -> PutSQL


PutSQL question : Oracle table has ~ 4 million records and when the PutSQL
was running , it gave several similar errors :

"Failed to update database due to failed batch update. There were total of
1 FlowFiles that failed, 5 that successful, and 9 that were not execute and
will be routed to retry"

Why might be wrong in PutSQL ? have kept PutSQL batch size of 1000 and
don't have any primary key constraint on postgres table.
(Should I create primary key with those two columns, so while loading it
can reject duplicate records, but will it rejects the complete batch rather
than just duplicates ?)

Would be great if someone can provide insight in this scenario ?

Thanks,
Vikram


Re: PutTCP connector not cleaning up dangling connections

2017-09-18 Thread Joe Witt
Davy

If you could give the PR a try and see if it helps I'd be happy to
help get it reviewed and in for 1.4 if timing works out.

Thanks

On Mon, Sep 18, 2017 at 4:39 PM, Bryan Bende  wrote:
> Davy,
>
> I just pushed a second commit to the PR that will log the port from
> the local address of the socket being used by the sender, which I
> think is what you mean by the client port.
>
> If you turn on debug for PutTCP you will see something like...
>
> o.apache.nifi.processors.standard.PutTCP
> PutTCP[id=95463da0-015e-1000-8723-0a2baf6c832f] Connected to local
> port 57280
>
> I only performed the logging after successful connection because I'm
> not sure the ramifications of trying to obtain the local SocketAddress
> while finishConnect is still returning false, so I guess the logging
> wouldn't really help in the case where the exception was thrown.
>
> https://github.com/apache/nifi/pull/2159
>
> -Bryan
>
>
> On Mon, Sep 18, 2017 at 2:18 PM, ddewaele  wrote:
>> Thx a lot for the quick response. Looking forward to the PR and the
>> release :)
>>
>> Would this for example still make the 1.4.0 release ?
>>
>> It would also be very interesting to log client ports in debug mode 
>> don't know how easy that is with nio.
>>
>> There is Keep Alive Timeout = 2min specified on the Moxa, so it means that
>> the socket on the client (NiFi) is still responding to "keep alive" packets.
>> (makes sense I guess, as we would need to configure some kind of read
>> timeout on the moxa to kill off the client).
>>
>> I guess the fact that we don't see anything in the stack is because the
>> socket got established in non blocking mode so it is in ESTABLISHED mode but
>> nobody is around to do any processing on it.
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/


Re: PutTCP connector not cleaning up dangling connections

2017-09-18 Thread Bryan Bende
Davy,

I just pushed a second commit to the PR that will log the port from
the local address of the socket being used by the sender, which I
think is what you mean by the client port.

If you turn on debug for PutTCP you will see something like...

o.apache.nifi.processors.standard.PutTCP
PutTCP[id=95463da0-015e-1000-8723-0a2baf6c832f] Connected to local
port 57280

I only performed the logging after successful connection because I'm
not sure the ramifications of trying to obtain the local SocketAddress
while finishConnect is still returning false, so I guess the logging
wouldn't really help in the case where the exception was thrown.

https://github.com/apache/nifi/pull/2159

-Bryan


On Mon, Sep 18, 2017 at 2:18 PM, ddewaele  wrote:
> Thx a lot for the quick response. Looking forward to the PR and the
> release :)
>
> Would this for example still make the 1.4.0 release ?
>
> It would also be very interesting to log client ports in debug mode 
> don't know how easy that is with nio.
>
> There is Keep Alive Timeout = 2min specified on the Moxa, so it means that
> the socket on the client (NiFi) is still responding to "keep alive" packets.
> (makes sense I guess, as we would need to configure some kind of read
> timeout on the moxa to kill off the client).
>
> I guess the fact that we don't see anything in the stack is because the
> socket got established in non blocking mode so it is in ESTABLISHED mode but
> nobody is around to do any processing on it.
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/


Re: PutTCP connector not cleaning up dangling connections

2017-09-18 Thread ddewaele
Thx a lot for the quick response. Looking forward to the PR and the
release :)

Would this for example still make the 1.4.0 release ?

It would also be very interesting to log client ports in debug mode 
don't know how easy that is with nio.

There is Keep Alive Timeout = 2min specified on the Moxa, so it means that
the socket on the client (NiFi) is still responding to "keep alive" packets.
(makes sense I guess, as we would need to configure some kind of read
timeout on the moxa to kill off the client).

I guess the fact that we don't see anything in the stack is because the
socket got established in non blocking mode so it is in ESTABLISHED mode but
nobody is around to do any processing on it.








--
Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/


Re: PutTCP connector not cleaning up dangling connections

2017-09-18 Thread ddewaele
Thx a lot for the quick response. Looking forward to the PR and the
release :)

Would this for example still make the 1.4.0 release ?

It would also be very interesting to log client ports in debug mode 
don't know how easy that is with nio.

There is Keep Alive Timeout = 2min specified on the Moxa, so it means that
the socket on the client (NiFi) is still responding to "keep alive" packets.
(makes sense I guess, as we would need to configure some kind of read
timeout on the moxa to kill off the client).

I guess the fact that we don't see anything in the stack is because the
socket got established in non blocking mode so it is in ESTABLISHED mode but
nobody is around to do any processing on it.








--
Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/


Re: Re: Re: QueryDatabaseTable - Deleted Records

2017-09-18 Thread Matt Burgess
Uwe,

Is there anything in the V$ARCHIVED_LOG table [1] in your source
database? If so you may be able to get some of that information from
there. Also is LogMiner [2] enabled on the database? That's another
way to be able to query the logs to get things like deletes.

In general, there has to be something in the system that knows when
something is deleted. That can be done in a few ways, with varying
amounts of success:

1) Cross-join the source table with the target table. This won't pick
up records that have been inserted, updated, then deleted since the
last time the join was performed. Plus the join is very costly for
large tables.
2) Change the schema of the tables to add a soft delete flag (as
mentioned in an earlier reply). This often cannot be done because the
main app needs a particular schema, or because the CDC user does not
have permission to do such things to the source DB
3) Intercept the calls that will change the DB. This is fragile
because you may not know if the call succeeds at the DB. Plus you may
not be able to change the architecture to put something in between the
users and the DB.
4) Interrogate the logs. This is IMO the only real way to do CDC, and
is how most CDC solutions operate. The CaptureChangeMySQL processor
reads the MySQL binary logs, and if/when the CaptureChangeOracle
processor is implemented, it will likely require LogMiner or something
to be enabled at the source to make the information available.  This
pattern can apply to all DBs that support such a thing, as long as
they can be interrogated via JDBC (SQL queries) or some client tool
(which hopefully for us has a Java port!)

Regards,
Matt

[1] 
https://docs.oracle.com/cd/B28359_01/server.111/b28320/dynviews_1016.htm#REFRN30011
[2] 
https://docs.oracle.com/cd/E11882_01/server.112/e22490/logminer.htm#SUTIL1559

On Mon, Sep 18, 2017 at 1:35 PM, Uwe Geercken  wrote:
> Andrew,
>
> yes. we are doing the same for the oracle db which is quite old and does not
> provide this information.
>
> Anyway. Was just curious if somebody has a smarter solution. The blogs of
> Nifi and Kafka have really good samples of extracting data but none of them
> touches the topic of deletes.
>
> Rgds,
>
> Uwe
>
>
>
> Gesendet: Samstag, 16. September 2017 um 13:52 Uhr
> Von: "Andrew Grande" 
> An: users@nifi.apache.org
> Betreff: Re: Re: QueryDatabaseTable - Deleted Records
>
> As an interesting architectural approach we took eons ago, before NiFi, was
> to take daily snapshots of a full table. Every row would then be
> hashed/digested or in any other way uniquely identified and 2 datasets would
> be crossed and compared to find inserts/deletes/updates. It was involved,
> but worked.
>
> Andrew
>
>
> On Sat, Sep 16, 2017, 2:38 AM Uwe Geercken  wrote:
>>
>> Bryan,
>>
>> yes, the change log would be possible. In my use case I have Oracle 11 as
>> the source - and I can not change the source easily (takes long - is
>> expensive).
>>
>> I was expecting this answer but wanted to make sure that I have not missed
>> anything. I will try to build my use case around something else then.
>>
>> Thanks for your response(s).
>>
>> Rgds,
>>
>> Uwe
>>
>> Gesendet: Freitag, 15. September 2017 um 16:15 Uhr
>> Von: "Bryan Bende" 
>> An: users@nifi.apache.org
>> Betreff: Re: QueryDatabaseTable - Deleted Records
>> Uwe,
>>
>> Typically you need to process the change log of the database in this
>> case, which unfortunately usually becomes database specific.
>>
>> I believe we have a processor CaptureChangeMySQL that can process the
>> MySQL change log.
>>
>> -Bryan
>>
>>
>> On Tue, Sep 12, 2017 at 1:39 PM, Uwe Geercken  wrote:
>> > Hello,
>> >
>> > apparently the QueryDatabaseTable processor catches changes made to the
>> > data
>> > of the source database - updates and inserts.
>> >
>> > Has anybody a good idea or strategy how to handle deletes in the source
>> > database? Of course one could flag a record as deleted instead of
>> > phisically
>> > deleting it. But this means changing the source system in many cases and
>> > that is sometimes not possible. And yes, if you process the change log
>> > (if
>> > available) of the source system that is also a good option.
>> >
>> > Would be greatful for any tips or a best practive of how you do it.
>> >
>> > Rgds,
>> >
>> > Uwe


Aw: Re: Re: QueryDatabaseTable - Deleted Records

2017-09-18 Thread Uwe Geercken

Andrew,

 

yes. we are doing the same for the oracle db which is quite old and does not provide this information.

 

Anyway. Was just curious if somebody has a smarter solution. The blogs of Nifi and Kafka have really good samples of extracting data but none of them touches the topic of deletes.

 

Rgds,

 

Uwe

 

 

 

Gesendet: Samstag, 16. September 2017 um 13:52 Uhr
Von: "Andrew Grande" 
An: users@nifi.apache.org
Betreff: Re: Re: QueryDatabaseTable - Deleted Records


As an interesting architectural approach we took eons ago, before NiFi, was to take daily snapshots of a full table. Every row would then be hashed/digested or in any other way uniquely identified and 2 datasets would be crossed and compared to find inserts/deletes/updates. It was involved, but worked.

Andrew
 


On Sat, Sep 16, 2017, 2:38 AM Uwe Geercken  wrote:





Bryan,

 

yes, the change log would be possible. In my use case I have Oracle 11 as the source - and I can not change the source easily (takes long - is expensive).

 

I was expecting this answer but wanted to make sure that I have not missed anything. I will try to build my use case around something else then.

 

Thanks for your response(s).

 

Rgds,

 

Uwe

 

Gesendet: Freitag, 15. September 2017 um 16:15 Uhr
Von: "Bryan Bende" 
An: users@nifi.apache.org
Betreff: Re: QueryDatabaseTable - Deleted Records











Uwe,

Typically you need to process the change log of the database in this
case, which unfortunately usually becomes database specific.

I believe we have a processor CaptureChangeMySQL that can process the
MySQL change log.

-Bryan


On Tue, Sep 12, 2017 at 1:39 PM, Uwe Geercken  wrote:
> Hello,
>
> apparently the QueryDatabaseTable processor catches changes made to the data
> of the source database - updates and inserts.
>
> Has anybody a good idea or strategy how to handle deletes in the source
> database? Of course one could flag a record as deleted instead of phisically
> deleting it. But this means changing the source system in many cases and
> that is sometimes not possible. And yes, if you process the change log (if
> available) of the source system that is also a good option.
>
> Would be greatful for any tips or a best practive of how you do it.
>
> Rgds,
>
> Uwe













Re: Manual release of flows

2017-09-18 Thread Bryan Bende
Hello,

You should be able to have a ListenHTTP processor (or
HandleHttpRequest/Response) that connects to a Notify processor which
would release the flow files sitting in front of Wait.

Thanks,

Bryan


On Sun, Sep 17, 2017 at 12:19 PM, joe harvyy  wrote:
> Hi,
>
> I have a use case where flow files should be blocked before moving to the
> next step. A manual action is needed to release these flow files. This
> manual action will be an HTTP post that an admin will be sending to NiFi
> through a simple web UI.
>
> How can I implement this behavior? Can I use Wait/Notify in this case or is
> there a better approach?
>
> Thanks
> J.
>


Re: PutTCP connector not cleaning up dangling connections

2017-09-18 Thread Bryan Bende
I just created a JIRA and will put up a PR shortly:

https://issues.apache.org/jira/browse/NIFI-4391

The processor is catching the exception while attempting to obtain a
connection, and then logs an error and transfers to failure which is
where we see this message:

2017-09-17 14:20:20,860 ERROR [Timer-Driven Process Thread-10]
o.apache.nifi.processors.standard.PutTCP
PutTCP[id=80231a39-1008-1159-a6fa-1f9e3751d608] No available connections,
and unable to create a new one, transferring

https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java#L330-L347



On Mon, Sep 18, 2017 at 9:22 AM, Joe Witt  wrote:
> Good catch.  Can you please be sure to cover in a JIRA?
>
> That said, wouldn't we see that in the stack trace during the
> problematic condition?
>
> On Mon, Sep 18, 2017 at 9:16 AM, Bryan Bende  wrote:
>> The code in SocketChannelSender that Davy pointed out could definitely
>> be the problem...
>>
>> It makes a non-blocking channel and calls connect, then goes into a
>> loop waiting for finishConnect() to return true, but if that doesn't
>> happen before the configured timeout, then it throws an exception, but
>> it doesn't first close the channel, and the processor also doesn't
>> close the sender.
>>
>> As an example comparison, this code which is being used by PutTCP:
>>
>> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java#L45-L78
>>
>> Should be doing something like this where it catches, closes, and rethrows:
>>
>> https://github.com/apache/nifi/blob/master/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java#L140-L179
>>
>>
>> On Mon, Sep 18, 2017 at 9:09 AM, Joe Witt  wrote:
>>> Davy
>>>
>>> Interesting.  So in looking through the stack trace I don't see
>>> anything related to sockets nifi has initiated to another service and
>>> nothing for PutTCP.  I'm not saying that means there is nothing but
>>> the stack traces only show the custom GetTCP processors.
>>>
>>> You can use netstat to show open sockets from the nifi process.  Can
>>> you try that and share those?
>>>
>>> Does the NiFi UI show the processor as having stuck threads?  I'm
>>> guessing not since there is nothing in the stack traces.
>>>
>>> Thanks
>>>
>>> On Mon, Sep 18, 2017 at 1:54 AM, ddewaele  wrote:
 Stopping the processor doesn't cleanup the tcp connection. It remains
 ESTABLISHED.

 There are 2 ways of getting out of it (none of them are ideal).

 - Restarting Nifi
 - Restarting the Moxa serial ports

 I've dumped the output in the following gist :
 https://gist.github.com/ddewaele/83705003740674962c1e133fb617f68c

 The GetTCP processor you'll see in the thread dump also interacts with the
 moxa. It is a Netty based custom processor we created (because there was no
 GetTCP at the time). However, we log all interactions (including client
 ports) with this processor and all of them end up getting closed correctly.

 So the "hanging" connection originated from the built-in PutTCP processor.


 Joe Witt wrote
> If you stop the processor manually does it clean them up?
>
> When the connections appear stuck can you please get a thread dump?
>
> bin/nifi.sh dump
>
> The results end up in bootstrap.log.
>
> Thanks
> Joe
>
> On Sep 17, 2017 2:22 PM, "ddewaele" <

> ddewaele@

> > wrote:
>
>> We are using NiFi PutTCP processors to send messages to a number of Moxa
>> onCell ip gateway devices.
>>
>> These Moxa devices are running on a cellular network with not always the
>> most ideal connection. The Moxa only allows for a maximum of 2
>> simultaneous
>> client connections.
>>
>> What we notice is that although we specify connection / read timeouts on
>> both PutTCP and the Moxa, that sometimes a connection get "stuck". (In
>> the
>> moxa network monitoring we see 2 client sockets coming from PutTCP in the
>> ESTABLISHED state that never go away).
>>
>> This doesn't always happen, but often enough for it to be considered a
>> problem, as it requires a restart of the moxa ports to clear the
>> connections
>> (manual step). It typically happens when PutTCP experiences a Timeout.
>>
>> On the PutTCP processors we have the following settings :
>>
>> - Idle Connection Expiration : 30 seconds  (we've set this higher due to
>> bad
>> gprs connection)
>> - Timeout : 10 seconds (this is only used as a timeout for establishing
>> the
>> connection)
>>
>> On the Moxas we have
>>
>> - TCP alive check time : 2min (this should force the Moxa to close the
>> socket)

Re: PutTCP connector not cleaning up dangling connections

2017-09-18 Thread Joe Witt
Good catch.  Can you please be sure to cover in a JIRA?

That said, wouldn't we see that in the stack trace during the
problematic condition?

On Mon, Sep 18, 2017 at 9:16 AM, Bryan Bende  wrote:
> The code in SocketChannelSender that Davy pointed out could definitely
> be the problem...
>
> It makes a non-blocking channel and calls connect, then goes into a
> loop waiting for finishConnect() to return true, but if that doesn't
> happen before the configured timeout, then it throws an exception, but
> it doesn't first close the channel, and the processor also doesn't
> close the sender.
>
> As an example comparison, this code which is being used by PutTCP:
>
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java#L45-L78
>
> Should be doing something like this where it catches, closes, and rethrows:
>
> https://github.com/apache/nifi/blob/master/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java#L140-L179
>
>
> On Mon, Sep 18, 2017 at 9:09 AM, Joe Witt  wrote:
>> Davy
>>
>> Interesting.  So in looking through the stack trace I don't see
>> anything related to sockets nifi has initiated to another service and
>> nothing for PutTCP.  I'm not saying that means there is nothing but
>> the stack traces only show the custom GetTCP processors.
>>
>> You can use netstat to show open sockets from the nifi process.  Can
>> you try that and share those?
>>
>> Does the NiFi UI show the processor as having stuck threads?  I'm
>> guessing not since there is nothing in the stack traces.
>>
>> Thanks
>>
>> On Mon, Sep 18, 2017 at 1:54 AM, ddewaele  wrote:
>>> Stopping the processor doesn't cleanup the tcp connection. It remains
>>> ESTABLISHED.
>>>
>>> There are 2 ways of getting out of it (none of them are ideal).
>>>
>>> - Restarting Nifi
>>> - Restarting the Moxa serial ports
>>>
>>> I've dumped the output in the following gist :
>>> https://gist.github.com/ddewaele/83705003740674962c1e133fb617f68c
>>>
>>> The GetTCP processor you'll see in the thread dump also interacts with the
>>> moxa. It is a Netty based custom processor we created (because there was no
>>> GetTCP at the time). However, we log all interactions (including client
>>> ports) with this processor and all of them end up getting closed correctly.
>>>
>>> So the "hanging" connection originated from the built-in PutTCP processor.
>>>
>>>
>>> Joe Witt wrote
 If you stop the processor manually does it clean them up?

 When the connections appear stuck can you please get a thread dump?

 bin/nifi.sh dump

 The results end up in bootstrap.log.

 Thanks
 Joe

 On Sep 17, 2017 2:22 PM, "ddewaele" <
>>>
 ddewaele@
>>>
 > wrote:

> We are using NiFi PutTCP processors to send messages to a number of Moxa
> onCell ip gateway devices.
>
> These Moxa devices are running on a cellular network with not always the
> most ideal connection. The Moxa only allows for a maximum of 2
> simultaneous
> client connections.
>
> What we notice is that although we specify connection / read timeouts on
> both PutTCP and the Moxa, that sometimes a connection get "stuck". (In
> the
> moxa network monitoring we see 2 client sockets coming from PutTCP in the
> ESTABLISHED state that never go away).
>
> This doesn't always happen, but often enough for it to be considered a
> problem, as it requires a restart of the moxa ports to clear the
> connections
> (manual step). It typically happens when PutTCP experiences a Timeout.
>
> On the PutTCP processors we have the following settings :
>
> - Idle Connection Expiration : 30 seconds  (we've set this higher due to
> bad
> gprs connection)
> - Timeout : 10 seconds (this is only used as a timeout for establishing
> the
> connection)
>
> On the Moxas we have
>
> - TCP alive check time : 2min (this should force the Moxa to close the
> socket)
>
> Yet for some reason the connection remains established.
>
> Here's what I found out :
>
> On the moxa I noticed a connection (with client port 48440) that is in
> ESTABLISHED mode for 4+ hours. (blocking other connections). On the Moxa
> I
> can see when the connection was established :
>
> 2017/09/17 14:20:29 [OpMode] Port01 Connect 10.192.2.90:48440
>
> I can track that down in Nifi via the logs (unfortunately PutTCP doesn't
> log
> client ports, but from the timestamp  I'm sure it's this connection :
>
> 2017-09-17 14:20:10,837 DEBUG [Timer-Driven Process Thread-10]
> o.apache.nifi.processors.standard.PutTCP
> PutTCP[id=80231a39-1008-1159-a6fa-1f9e3751d608] No available connections,
> creating a new one...
> 2017-09-17 14:20:20,860 ERROR [Timer-Driven Pr

Re: PutTCP connector not cleaning up dangling connections

2017-09-18 Thread Bryan Bende
The code in SocketChannelSender that Davy pointed out could definitely
be the problem...

It makes a non-blocking channel and calls connect, then goes into a
loop waiting for finishConnect() to return true, but if that doesn't
happen before the configured timeout, then it throws an exception, but
it doesn't first close the channel, and the processor also doesn't
close the sender.

As an example comparison, this code which is being used by PutTCP:

https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java#L45-L78

Should be doing something like this where it catches, closes, and rethrows:

https://github.com/apache/nifi/blob/master/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java#L140-L179


On Mon, Sep 18, 2017 at 9:09 AM, Joe Witt  wrote:
> Davy
>
> Interesting.  So in looking through the stack trace I don't see
> anything related to sockets nifi has initiated to another service and
> nothing for PutTCP.  I'm not saying that means there is nothing but
> the stack traces only show the custom GetTCP processors.
>
> You can use netstat to show open sockets from the nifi process.  Can
> you try that and share those?
>
> Does the NiFi UI show the processor as having stuck threads?  I'm
> guessing not since there is nothing in the stack traces.
>
> Thanks
>
> On Mon, Sep 18, 2017 at 1:54 AM, ddewaele  wrote:
>> Stopping the processor doesn't cleanup the tcp connection. It remains
>> ESTABLISHED.
>>
>> There are 2 ways of getting out of it (none of them are ideal).
>>
>> - Restarting Nifi
>> - Restarting the Moxa serial ports
>>
>> I've dumped the output in the following gist :
>> https://gist.github.com/ddewaele/83705003740674962c1e133fb617f68c
>>
>> The GetTCP processor you'll see in the thread dump also interacts with the
>> moxa. It is a Netty based custom processor we created (because there was no
>> GetTCP at the time). However, we log all interactions (including client
>> ports) with this processor and all of them end up getting closed correctly.
>>
>> So the "hanging" connection originated from the built-in PutTCP processor.
>>
>>
>> Joe Witt wrote
>>> If you stop the processor manually does it clean them up?
>>>
>>> When the connections appear stuck can you please get a thread dump?
>>>
>>> bin/nifi.sh dump
>>>
>>> The results end up in bootstrap.log.
>>>
>>> Thanks
>>> Joe
>>>
>>> On Sep 17, 2017 2:22 PM, "ddewaele" <
>>
>>> ddewaele@
>>
>>> > wrote:
>>>
 We are using NiFi PutTCP processors to send messages to a number of Moxa
 onCell ip gateway devices.

 These Moxa devices are running on a cellular network with not always the
 most ideal connection. The Moxa only allows for a maximum of 2
 simultaneous
 client connections.

 What we notice is that although we specify connection / read timeouts on
 both PutTCP and the Moxa, that sometimes a connection get "stuck". (In
 the
 moxa network monitoring we see 2 client sockets coming from PutTCP in the
 ESTABLISHED state that never go away).

 This doesn't always happen, but often enough for it to be considered a
 problem, as it requires a restart of the moxa ports to clear the
 connections
 (manual step). It typically happens when PutTCP experiences a Timeout.

 On the PutTCP processors we have the following settings :

 - Idle Connection Expiration : 30 seconds  (we've set this higher due to
 bad
 gprs connection)
 - Timeout : 10 seconds (this is only used as a timeout for establishing
 the
 connection)

 On the Moxas we have

 - TCP alive check time : 2min (this should force the Moxa to close the
 socket)

 Yet for some reason the connection remains established.

 Here's what I found out :

 On the moxa I noticed a connection (with client port 48440) that is in
 ESTABLISHED mode for 4+ hours. (blocking other connections). On the Moxa
 I
 can see when the connection was established :

 2017/09/17 14:20:29 [OpMode] Port01 Connect 10.192.2.90:48440

 I can track that down in Nifi via the logs (unfortunately PutTCP doesn't
 log
 client ports, but from the timestamp  I'm sure it's this connection :

 2017-09-17 14:20:10,837 DEBUG [Timer-Driven Process Thread-10]
 o.apache.nifi.processors.standard.PutTCP
 PutTCP[id=80231a39-1008-1159-a6fa-1f9e3751d608] No available connections,
 creating a new one...
 2017-09-17 14:20:20,860 ERROR [Timer-Driven Process Thread-10]
 o.apache.nifi.processors.standard.PutTCP
 PutTCP[id=80231a39-1008-1159-a6fa-1f9e3751d608] No available connections,
 and unable to create a new one, transferring
 StandardFlowFileRecord[uuid=79f2a166-5211-4d2d-9275-03f0ce4d5b29,claim=
 StandardContentClaim
 [resourceClaim=StandardResourceCl

Re: PutTCP connector not cleaning up dangling connections

2017-09-18 Thread Joe Witt
Davy

Interesting.  So in looking through the stack trace I don't see
anything related to sockets nifi has initiated to another service and
nothing for PutTCP.  I'm not saying that means there is nothing but
the stack traces only show the custom GetTCP processors.

You can use netstat to show open sockets from the nifi process.  Can
you try that and share those?

Does the NiFi UI show the processor as having stuck threads?  I'm
guessing not since there is nothing in the stack traces.

Thanks

On Mon, Sep 18, 2017 at 1:54 AM, ddewaele  wrote:
> Stopping the processor doesn't cleanup the tcp connection. It remains
> ESTABLISHED.
>
> There are 2 ways of getting out of it (none of them are ideal).
>
> - Restarting Nifi
> - Restarting the Moxa serial ports
>
> I've dumped the output in the following gist :
> https://gist.github.com/ddewaele/83705003740674962c1e133fb617f68c
>
> The GetTCP processor you'll see in the thread dump also interacts with the
> moxa. It is a Netty based custom processor we created (because there was no
> GetTCP at the time). However, we log all interactions (including client
> ports) with this processor and all of them end up getting closed correctly.
>
> So the "hanging" connection originated from the built-in PutTCP processor.
>
>
> Joe Witt wrote
>> If you stop the processor manually does it clean them up?
>>
>> When the connections appear stuck can you please get a thread dump?
>>
>> bin/nifi.sh dump
>>
>> The results end up in bootstrap.log.
>>
>> Thanks
>> Joe
>>
>> On Sep 17, 2017 2:22 PM, "ddewaele" <
>
>> ddewaele@
>
>> > wrote:
>>
>>> We are using NiFi PutTCP processors to send messages to a number of Moxa
>>> onCell ip gateway devices.
>>>
>>> These Moxa devices are running on a cellular network with not always the
>>> most ideal connection. The Moxa only allows for a maximum of 2
>>> simultaneous
>>> client connections.
>>>
>>> What we notice is that although we specify connection / read timeouts on
>>> both PutTCP and the Moxa, that sometimes a connection get "stuck". (In
>>> the
>>> moxa network monitoring we see 2 client sockets coming from PutTCP in the
>>> ESTABLISHED state that never go away).
>>>
>>> This doesn't always happen, but often enough for it to be considered a
>>> problem, as it requires a restart of the moxa ports to clear the
>>> connections
>>> (manual step). It typically happens when PutTCP experiences a Timeout.
>>>
>>> On the PutTCP processors we have the following settings :
>>>
>>> - Idle Connection Expiration : 30 seconds  (we've set this higher due to
>>> bad
>>> gprs connection)
>>> - Timeout : 10 seconds (this is only used as a timeout for establishing
>>> the
>>> connection)
>>>
>>> On the Moxas we have
>>>
>>> - TCP alive check time : 2min (this should force the Moxa to close the
>>> socket)
>>>
>>> Yet for some reason the connection remains established.
>>>
>>> Here's what I found out :
>>>
>>> On the moxa I noticed a connection (with client port 48440) that is in
>>> ESTABLISHED mode for 4+ hours. (blocking other connections). On the Moxa
>>> I
>>> can see when the connection was established :
>>>
>>> 2017/09/17 14:20:29 [OpMode] Port01 Connect 10.192.2.90:48440
>>>
>>> I can track that down in Nifi via the logs (unfortunately PutTCP doesn't
>>> log
>>> client ports, but from the timestamp  I'm sure it's this connection :
>>>
>>> 2017-09-17 14:20:10,837 DEBUG [Timer-Driven Process Thread-10]
>>> o.apache.nifi.processors.standard.PutTCP
>>> PutTCP[id=80231a39-1008-1159-a6fa-1f9e3751d608] No available connections,
>>> creating a new one...
>>> 2017-09-17 14:20:20,860 ERROR [Timer-Driven Process Thread-10]
>>> o.apache.nifi.processors.standard.PutTCP
>>> PutTCP[id=80231a39-1008-1159-a6fa-1f9e3751d608] No available connections,
>>> and unable to create a new one, transferring
>>> StandardFlowFileRecord[uuid=79f2a166-5211-4d2d-9275-03f0ce4d5b29,claim=
>>> StandardContentClaim
>>> [resourceClaim=StandardResourceClaim[id=1505641210025-1,
>>> container=default,
>>> section=1], offset=84519, length=9],offset=0,name=
>>> 23934743676390659,size=9]
>>> to failure: java.net.SocketTimeoutException: Timed out connecting to
>>> 10.32.133.40:4001
>>> 2017-09-17 14:20:20,860 ERROR [Timer-Driven Process Thread-10]
>>> o.apache.nifi.processors.standard.PutTCP
>>> java.net.SocketTimeoutException: Timed out connecting to
>>> 10.32.133.40:4001
>>> at
>>> org.apache.nifi.processor.util.put.sender.SocketChannelSender.open(
>>> SocketChannelSender.java:66)
>>> ~[nifi-processor-utils-1.1.0.jar:1.1.0]
>>> at
>>> org.apache.nifi.processor.util.put.AbstractPutEventProcessor.createSender(
>>> AbstractPutEventProcessor.java:312)
>>> ~[nifi-processor-utils-1.1.0.jar:1.1.0]
>>> at
>>> org.apache.nifi.processors.standard.PutTCP.createSender(PutTCP.java:121)
>>> [nifi-standard-processors-1.1.0.jar:1.1.0]
>>> at
>>> org.apache.nifi.processor.util.put.AbstractPutEventProcessor.
>>> acquireSender(AbstractPutEventProcessor.java:334)
>>> ~[nifi-proces

Re: org.apache.nifi.spark.NiFiReceiver and spark spark.streaming.receiver.maxRate

2017-09-18 Thread Andrew Grande
A typical production setup is to use Kafka in the middle.

Andrew

On Mon, Sep 18, 2017, 3:02 AM Margus Roo  wrote:

> Hi
>
> I need to take flow with Spark streaming from Nifi port. As we know
> Spark supports spark.streaming.receiver.maxRate and
> spark.streaming.receiver.backpressure
>
> Seems that org.apache.nifi.spark.NiFiReceiver does support it at all.
>
> https://github.com/apache/nifi/tree/master/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark
> - code is quite old too.
>
> My question is - what is the recommended aproach today getting stream
> from nifi with Spark? Is
>
> https://github.com/apache/nifi/tree/master/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark
> the best we have? If it is then what is the best aproach to integrate
> spark maxRate or backpressure in it?
>
> --
> Margus (margusja) Roo
> http://margus.roo.ee
> skype: margusja
> https://www.facebook.com/allan.tuuring
> +372 51 48 780
>
>


org.apache.nifi.spark.NiFiReceiver and spark spark.streaming.receiver.maxRate

2017-09-18 Thread Margus Roo

Hi

I need to take flow with Spark streaming from Nifi port. As we know 
Spark supports spark.streaming.receiver.maxRate and 
spark.streaming.receiver.backpressure


Seems that org.apache.nifi.spark.NiFiReceiver does support it at all. 
https://github.com/apache/nifi/tree/master/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark 
- code is quite old too.


My question is - what is the recommended aproach today getting stream 
from nifi with Spark? Is 
https://github.com/apache/nifi/tree/master/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark 
the best we have? If it is then what is the best aproach to integrate 
spark maxRate or backpressure in it?


--
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
https://www.facebook.com/allan.tuuring
+372 51 48 780