Re: Removing duplicates from data
Could not find 'PutDatabaseRecord' in the NiFi version : 1.1.0.2.1.2.0-10 I am using . Please suggest ? On Tue, Sep 19, 2017 at 12:10 AM, Vikram More wrote: > Hi Koji, > Thanks for response and helpful links ! > > NiFi version : 1.1.0.2.1.2.0-10 > > I am trying to move data from operational system (oracle db) to analytical > system (postgres db). Postgres table has been model/designed by us (and can > add primary key). Data from oracle looks like below (i need to remove > duplicate record for combination on ColA , ColB) > > Col A Col B > C1 item 1 > C1 item 2 > *C2* *item 3* > *C2* *item 4* > *C2* *item 3* > C3 item 1 > C4 null > C5 item 5 > C6 item 7 > I will try to explore PutDatabaseRecord processor and see i can achieve > desired purpose. > > Thanks, > Vikram > > On Mon, Sep 18, 2017 at 9:59 PM, Koji Kawamura > wrote: > >> Hello Vikram, >> >> Welcome to NiFi and the community :) >> >> Would you elaborate your data flow? And which version you are using? >> For example, can you share some input data extracted from Oracle? I >> wonder why you need to remove duplicate records while PostgreSQL >> doesn't have primary key constraint, or why you have such records in >> the beginning. >> >> Current PutSQL does not report the cause of batch update failure well. >> But that behavior has been improved and you can see what is the cause >> if you can use NiFi 1.4.0-SNAPSHOT (you need to build NiFi from source >> code to try it). >> https://issues.apache.org/jira/browse/NIFI-4162 >> >> Please refer NiFi README.md for how to build and run NiFi from source >> code. >> https://github.com/apache/nifi >> >> Also, in order to put Avro data to an RDBMS, NiFi also has >> PutDatabaseRecord processor today. Which can work more efficiently >> because you don't have to use 'split avro -> avrotojson -> jsontosql' >> part, PutDatabaseRecord can directly execute DML statement from Avro >> dataset. >> https://nifi.apache.org/docs/nifi-docs/components/org.apache >> .nifi/nifi-standard-nar/1.3.0/org.apache.nifi.processors. >> standard.PutDatabaseRecord/index.html >> >> Thanks, >> Koji >> >> On Tue, Sep 19, 2017 at 9:21 AM, Vikram More >> wrote: >> > Hi Everyone, >> > >> > I am new to NiFi and community :) >> > >> > I am trying to build a Nifi flow which will pull from Oracle table and >> load >> > into Postgres table. My select query has two columns and I need to >> remove >> > duplicates based on these two columns. Can I remove duplicates in Nifi >> based >> > on two column data values. My flow is like below - >> > ExecuteSQL -> split avro -> avrotojson -> jsontosql -> PutSQL >> > >> > >> > PutSQL question : Oracle table has ~ 4 million records and when the >> PutSQL >> > was running , it gave several similar errors : >> > >> > "Failed to update database due to failed batch update. There were total >> of 1 >> > FlowFiles that failed, 5 that successful, and 9 that were not execute >> and >> > will be routed to retry" >> > >> > Why might be wrong in PutSQL ? have kept PutSQL batch size of 1000 and >> don't >> > have any primary key constraint on postgres table. >> > (Should I create primary key with those two columns, so while loading >> it can >> > reject duplicate records, but will it rejects the complete batch rather >> than >> > just duplicates ?) >> > >> > Would be great if someone can provide insight in this scenario ? >> > >> > Thanks, >> > Vikram >> > >
Re: Removing duplicates from data
Hi Koji, Thanks for response and helpful links ! NiFi version : 1.1.0.2.1.2.0-10 I am trying to move data from operational system (oracle db) to analytical system (postgres db). Postgres table has been model/designed by us (and can add primary key). Data from oracle looks like below (i need to remove duplicate record for combination on ColA , ColB) Col A Col B C1 item 1 C1 item 2 *C2* *item 3* *C2* *item 4* *C2* *item 3* C3 item 1 C4 null C5 item 5 C6 item 7 I will try to explore PutDatabaseRecord processor and see i can achieve desired purpose. Thanks, Vikram On Mon, Sep 18, 2017 at 9:59 PM, Koji Kawamura wrote: > Hello Vikram, > > Welcome to NiFi and the community :) > > Would you elaborate your data flow? And which version you are using? > For example, can you share some input data extracted from Oracle? I > wonder why you need to remove duplicate records while PostgreSQL > doesn't have primary key constraint, or why you have such records in > the beginning. > > Current PutSQL does not report the cause of batch update failure well. > But that behavior has been improved and you can see what is the cause > if you can use NiFi 1.4.0-SNAPSHOT (you need to build NiFi from source > code to try it). > https://issues.apache.org/jira/browse/NIFI-4162 > > Please refer NiFi README.md for how to build and run NiFi from source code. > https://github.com/apache/nifi > > Also, in order to put Avro data to an RDBMS, NiFi also has > PutDatabaseRecord processor today. Which can work more efficiently > because you don't have to use 'split avro -> avrotojson -> jsontosql' > part, PutDatabaseRecord can directly execute DML statement from Avro > dataset. > https://nifi.apache.org/docs/nifi-docs/components/org. > apache.nifi/nifi-standard-nar/1.3.0/org.apache.nifi.processors.standard. > PutDatabaseRecord/index.html > > Thanks, > Koji > > On Tue, Sep 19, 2017 at 9:21 AM, Vikram More > wrote: > > Hi Everyone, > > > > I am new to NiFi and community :) > > > > I am trying to build a Nifi flow which will pull from Oracle table and > load > > into Postgres table. My select query has two columns and I need to remove > > duplicates based on these two columns. Can I remove duplicates in Nifi > based > > on two column data values. My flow is like below - > > ExecuteSQL -> split avro -> avrotojson -> jsontosql -> PutSQL > > > > > > PutSQL question : Oracle table has ~ 4 million records and when the > PutSQL > > was running , it gave several similar errors : > > > > "Failed to update database due to failed batch update. There were total > of 1 > > FlowFiles that failed, 5 that successful, and 9 that were not execute and > > will be routed to retry" > > > > Why might be wrong in PutSQL ? have kept PutSQL batch size of 1000 and > don't > > have any primary key constraint on postgres table. > > (Should I create primary key with those two columns, so while loading it > can > > reject duplicate records, but will it rejects the complete batch rather > than > > just duplicates ?) > > > > Would be great if someone can provide insight in this scenario ? > > > > Thanks, > > Vikram >
Re: Removing duplicates from data
Hello Vikram, Welcome to NiFi and the community :) Would you elaborate your data flow? And which version you are using? For example, can you share some input data extracted from Oracle? I wonder why you need to remove duplicate records while PostgreSQL doesn't have primary key constraint, or why you have such records in the beginning. Current PutSQL does not report the cause of batch update failure well. But that behavior has been improved and you can see what is the cause if you can use NiFi 1.4.0-SNAPSHOT (you need to build NiFi from source code to try it). https://issues.apache.org/jira/browse/NIFI-4162 Please refer NiFi README.md for how to build and run NiFi from source code. https://github.com/apache/nifi Also, in order to put Avro data to an RDBMS, NiFi also has PutDatabaseRecord processor today. Which can work more efficiently because you don't have to use 'split avro -> avrotojson -> jsontosql' part, PutDatabaseRecord can directly execute DML statement from Avro dataset. https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.3.0/org.apache.nifi.processors.standard.PutDatabaseRecord/index.html Thanks, Koji On Tue, Sep 19, 2017 at 9:21 AM, Vikram More wrote: > Hi Everyone, > > I am new to NiFi and community :) > > I am trying to build a Nifi flow which will pull from Oracle table and load > into Postgres table. My select query has two columns and I need to remove > duplicates based on these two columns. Can I remove duplicates in Nifi based > on two column data values. My flow is like below - > ExecuteSQL -> split avro -> avrotojson -> jsontosql -> PutSQL > > > PutSQL question : Oracle table has ~ 4 million records and when the PutSQL > was running , it gave several similar errors : > > "Failed to update database due to failed batch update. There were total of 1 > FlowFiles that failed, 5 that successful, and 9 that were not execute and > will be routed to retry" > > Why might be wrong in PutSQL ? have kept PutSQL batch size of 1000 and don't > have any primary key constraint on postgres table. > (Should I create primary key with those two columns, so while loading it can > reject duplicate records, but will it rejects the complete batch rather than > just duplicates ?) > > Would be great if someone can provide insight in this scenario ? > > Thanks, > Vikram
Removing duplicates from data
Hi Everyone, I am new to NiFi and community :) I am trying to build a Nifi flow which will pull from Oracle table and load into Postgres table. My select query has two columns and I need to remove duplicates based on these two columns. Can I remove duplicates in Nifi based on two column data values. My flow is like below - ExecuteSQL -> split avro -> avrotojson -> jsontosql -> PutSQL PutSQL question : Oracle table has ~ 4 million records and when the PutSQL was running , it gave several similar errors : "Failed to update database due to failed batch update. There were total of 1 FlowFiles that failed, 5 that successful, and 9 that were not execute and will be routed to retry" Why might be wrong in PutSQL ? have kept PutSQL batch size of 1000 and don't have any primary key constraint on postgres table. (Should I create primary key with those two columns, so while loading it can reject duplicate records, but will it rejects the complete batch rather than just duplicates ?) Would be great if someone can provide insight in this scenario ? Thanks, Vikram
Re: PutTCP connector not cleaning up dangling connections
Davy If you could give the PR a try and see if it helps I'd be happy to help get it reviewed and in for 1.4 if timing works out. Thanks On Mon, Sep 18, 2017 at 4:39 PM, Bryan Bende wrote: > Davy, > > I just pushed a second commit to the PR that will log the port from > the local address of the socket being used by the sender, which I > think is what you mean by the client port. > > If you turn on debug for PutTCP you will see something like... > > o.apache.nifi.processors.standard.PutTCP > PutTCP[id=95463da0-015e-1000-8723-0a2baf6c832f] Connected to local > port 57280 > > I only performed the logging after successful connection because I'm > not sure the ramifications of trying to obtain the local SocketAddress > while finishConnect is still returning false, so I guess the logging > wouldn't really help in the case where the exception was thrown. > > https://github.com/apache/nifi/pull/2159 > > -Bryan > > > On Mon, Sep 18, 2017 at 2:18 PM, ddewaele wrote: >> Thx a lot for the quick response. Looking forward to the PR and the >> release :) >> >> Would this for example still make the 1.4.0 release ? >> >> It would also be very interesting to log client ports in debug mode >> don't know how easy that is with nio. >> >> There is Keep Alive Timeout = 2min specified on the Moxa, so it means that >> the socket on the client (NiFi) is still responding to "keep alive" packets. >> (makes sense I guess, as we would need to configure some kind of read >> timeout on the moxa to kill off the client). >> >> I guess the fact that we don't see anything in the stack is because the >> socket got established in non blocking mode so it is in ESTABLISHED mode but >> nobody is around to do any processing on it. >> >> >> >> >> >> >> >> >> -- >> Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/
Re: PutTCP connector not cleaning up dangling connections
Davy, I just pushed a second commit to the PR that will log the port from the local address of the socket being used by the sender, which I think is what you mean by the client port. If you turn on debug for PutTCP you will see something like... o.apache.nifi.processors.standard.PutTCP PutTCP[id=95463da0-015e-1000-8723-0a2baf6c832f] Connected to local port 57280 I only performed the logging after successful connection because I'm not sure the ramifications of trying to obtain the local SocketAddress while finishConnect is still returning false, so I guess the logging wouldn't really help in the case where the exception was thrown. https://github.com/apache/nifi/pull/2159 -Bryan On Mon, Sep 18, 2017 at 2:18 PM, ddewaele wrote: > Thx a lot for the quick response. Looking forward to the PR and the > release :) > > Would this for example still make the 1.4.0 release ? > > It would also be very interesting to log client ports in debug mode > don't know how easy that is with nio. > > There is Keep Alive Timeout = 2min specified on the Moxa, so it means that > the socket on the client (NiFi) is still responding to "keep alive" packets. > (makes sense I guess, as we would need to configure some kind of read > timeout on the moxa to kill off the client). > > I guess the fact that we don't see anything in the stack is because the > socket got established in non blocking mode so it is in ESTABLISHED mode but > nobody is around to do any processing on it. > > > > > > > > > -- > Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/
Re: PutTCP connector not cleaning up dangling connections
Thx a lot for the quick response. Looking forward to the PR and the release :) Would this for example still make the 1.4.0 release ? It would also be very interesting to log client ports in debug mode don't know how easy that is with nio. There is Keep Alive Timeout = 2min specified on the Moxa, so it means that the socket on the client (NiFi) is still responding to "keep alive" packets. (makes sense I guess, as we would need to configure some kind of read timeout on the moxa to kill off the client). I guess the fact that we don't see anything in the stack is because the socket got established in non blocking mode so it is in ESTABLISHED mode but nobody is around to do any processing on it. -- Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/
Re: PutTCP connector not cleaning up dangling connections
Thx a lot for the quick response. Looking forward to the PR and the release :) Would this for example still make the 1.4.0 release ? It would also be very interesting to log client ports in debug mode don't know how easy that is with nio. There is Keep Alive Timeout = 2min specified on the Moxa, so it means that the socket on the client (NiFi) is still responding to "keep alive" packets. (makes sense I guess, as we would need to configure some kind of read timeout on the moxa to kill off the client). I guess the fact that we don't see anything in the stack is because the socket got established in non blocking mode so it is in ESTABLISHED mode but nobody is around to do any processing on it. -- Sent from: http://apache-nifi-users-list.2361937.n4.nabble.com/
Re: Re: Re: QueryDatabaseTable - Deleted Records
Uwe, Is there anything in the V$ARCHIVED_LOG table [1] in your source database? If so you may be able to get some of that information from there. Also is LogMiner [2] enabled on the database? That's another way to be able to query the logs to get things like deletes. In general, there has to be something in the system that knows when something is deleted. That can be done in a few ways, with varying amounts of success: 1) Cross-join the source table with the target table. This won't pick up records that have been inserted, updated, then deleted since the last time the join was performed. Plus the join is very costly for large tables. 2) Change the schema of the tables to add a soft delete flag (as mentioned in an earlier reply). This often cannot be done because the main app needs a particular schema, or because the CDC user does not have permission to do such things to the source DB 3) Intercept the calls that will change the DB. This is fragile because you may not know if the call succeeds at the DB. Plus you may not be able to change the architecture to put something in between the users and the DB. 4) Interrogate the logs. This is IMO the only real way to do CDC, and is how most CDC solutions operate. The CaptureChangeMySQL processor reads the MySQL binary logs, and if/when the CaptureChangeOracle processor is implemented, it will likely require LogMiner or something to be enabled at the source to make the information available. This pattern can apply to all DBs that support such a thing, as long as they can be interrogated via JDBC (SQL queries) or some client tool (which hopefully for us has a Java port!) Regards, Matt [1] https://docs.oracle.com/cd/B28359_01/server.111/b28320/dynviews_1016.htm#REFRN30011 [2] https://docs.oracle.com/cd/E11882_01/server.112/e22490/logminer.htm#SUTIL1559 On Mon, Sep 18, 2017 at 1:35 PM, Uwe Geercken wrote: > Andrew, > > yes. we are doing the same for the oracle db which is quite old and does not > provide this information. > > Anyway. Was just curious if somebody has a smarter solution. The blogs of > Nifi and Kafka have really good samples of extracting data but none of them > touches the topic of deletes. > > Rgds, > > Uwe > > > > Gesendet: Samstag, 16. September 2017 um 13:52 Uhr > Von: "Andrew Grande" > An: users@nifi.apache.org > Betreff: Re: Re: QueryDatabaseTable - Deleted Records > > As an interesting architectural approach we took eons ago, before NiFi, was > to take daily snapshots of a full table. Every row would then be > hashed/digested or in any other way uniquely identified and 2 datasets would > be crossed and compared to find inserts/deletes/updates. It was involved, > but worked. > > Andrew > > > On Sat, Sep 16, 2017, 2:38 AM Uwe Geercken wrote: >> >> Bryan, >> >> yes, the change log would be possible. In my use case I have Oracle 11 as >> the source - and I can not change the source easily (takes long - is >> expensive). >> >> I was expecting this answer but wanted to make sure that I have not missed >> anything. I will try to build my use case around something else then. >> >> Thanks for your response(s). >> >> Rgds, >> >> Uwe >> >> Gesendet: Freitag, 15. September 2017 um 16:15 Uhr >> Von: "Bryan Bende" >> An: users@nifi.apache.org >> Betreff: Re: QueryDatabaseTable - Deleted Records >> Uwe, >> >> Typically you need to process the change log of the database in this >> case, which unfortunately usually becomes database specific. >> >> I believe we have a processor CaptureChangeMySQL that can process the >> MySQL change log. >> >> -Bryan >> >> >> On Tue, Sep 12, 2017 at 1:39 PM, Uwe Geercken wrote: >> > Hello, >> > >> > apparently the QueryDatabaseTable processor catches changes made to the >> > data >> > of the source database - updates and inserts. >> > >> > Has anybody a good idea or strategy how to handle deletes in the source >> > database? Of course one could flag a record as deleted instead of >> > phisically >> > deleting it. But this means changing the source system in many cases and >> > that is sometimes not possible. And yes, if you process the change log >> > (if >> > available) of the source system that is also a good option. >> > >> > Would be greatful for any tips or a best practive of how you do it. >> > >> > Rgds, >> > >> > Uwe
Aw: Re: Re: QueryDatabaseTable - Deleted Records
Andrew, yes. we are doing the same for the oracle db which is quite old and does not provide this information. Anyway. Was just curious if somebody has a smarter solution. The blogs of Nifi and Kafka have really good samples of extracting data but none of them touches the topic of deletes. Rgds, Uwe Gesendet: Samstag, 16. September 2017 um 13:52 Uhr Von: "Andrew Grande" An: users@nifi.apache.org Betreff: Re: Re: QueryDatabaseTable - Deleted Records As an interesting architectural approach we took eons ago, before NiFi, was to take daily snapshots of a full table. Every row would then be hashed/digested or in any other way uniquely identified and 2 datasets would be crossed and compared to find inserts/deletes/updates. It was involved, but worked. Andrew On Sat, Sep 16, 2017, 2:38 AM Uwe Geerckenwrote: Bryan, yes, the change log would be possible. In my use case I have Oracle 11 as the source - and I can not change the source easily (takes long - is expensive). I was expecting this answer but wanted to make sure that I have not missed anything. I will try to build my use case around something else then. Thanks for your response(s). Rgds, Uwe Gesendet: Freitag, 15. September 2017 um 16:15 Uhr Von: "Bryan Bende" An: users@nifi.apache.org Betreff: Re: QueryDatabaseTable - Deleted Records Uwe, Typically you need to process the change log of the database in this case, which unfortunately usually becomes database specific. I believe we have a processor CaptureChangeMySQL that can process the MySQL change log. -Bryan On Tue, Sep 12, 2017 at 1:39 PM, Uwe Geercken wrote: > Hello, > > apparently the QueryDatabaseTable processor catches changes made to the data > of the source database - updates and inserts. > > Has anybody a good idea or strategy how to handle deletes in the source > database? Of course one could flag a record as deleted instead of phisically > deleting it. But this means changing the source system in many cases and > that is sometimes not possible. And yes, if you process the change log (if > available) of the source system that is also a good option. > > Would be greatful for any tips or a best practive of how you do it. > > Rgds, > > Uwe
Re: Manual release of flows
Hello, You should be able to have a ListenHTTP processor (or HandleHttpRequest/Response) that connects to a Notify processor which would release the flow files sitting in front of Wait. Thanks, Bryan On Sun, Sep 17, 2017 at 12:19 PM, joe harvyy wrote: > Hi, > > I have a use case where flow files should be blocked before moving to the > next step. A manual action is needed to release these flow files. This > manual action will be an HTTP post that an admin will be sending to NiFi > through a simple web UI. > > How can I implement this behavior? Can I use Wait/Notify in this case or is > there a better approach? > > Thanks > J. >
Re: PutTCP connector not cleaning up dangling connections
I just created a JIRA and will put up a PR shortly: https://issues.apache.org/jira/browse/NIFI-4391 The processor is catching the exception while attempting to obtain a connection, and then logs an error and transfers to failure which is where we see this message: 2017-09-17 14:20:20,860 ERROR [Timer-Driven Process Thread-10] o.apache.nifi.processors.standard.PutTCP PutTCP[id=80231a39-1008-1159-a6fa-1f9e3751d608] No available connections, and unable to create a new one, transferring https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java#L330-L347 On Mon, Sep 18, 2017 at 9:22 AM, Joe Witt wrote: > Good catch. Can you please be sure to cover in a JIRA? > > That said, wouldn't we see that in the stack trace during the > problematic condition? > > On Mon, Sep 18, 2017 at 9:16 AM, Bryan Bende wrote: >> The code in SocketChannelSender that Davy pointed out could definitely >> be the problem... >> >> It makes a non-blocking channel and calls connect, then goes into a >> loop waiting for finishConnect() to return true, but if that doesn't >> happen before the configured timeout, then it throws an exception, but >> it doesn't first close the channel, and the processor also doesn't >> close the sender. >> >> As an example comparison, this code which is being used by PutTCP: >> >> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java#L45-L78 >> >> Should be doing something like this where it catches, closes, and rethrows: >> >> https://github.com/apache/nifi/blob/master/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java#L140-L179 >> >> >> On Mon, Sep 18, 2017 at 9:09 AM, Joe Witt wrote: >>> Davy >>> >>> Interesting. So in looking through the stack trace I don't see >>> anything related to sockets nifi has initiated to another service and >>> nothing for PutTCP. I'm not saying that means there is nothing but >>> the stack traces only show the custom GetTCP processors. >>> >>> You can use netstat to show open sockets from the nifi process. Can >>> you try that and share those? >>> >>> Does the NiFi UI show the processor as having stuck threads? I'm >>> guessing not since there is nothing in the stack traces. >>> >>> Thanks >>> >>> On Mon, Sep 18, 2017 at 1:54 AM, ddewaele wrote: Stopping the processor doesn't cleanup the tcp connection. It remains ESTABLISHED. There are 2 ways of getting out of it (none of them are ideal). - Restarting Nifi - Restarting the Moxa serial ports I've dumped the output in the following gist : https://gist.github.com/ddewaele/83705003740674962c1e133fb617f68c The GetTCP processor you'll see in the thread dump also interacts with the moxa. It is a Netty based custom processor we created (because there was no GetTCP at the time). However, we log all interactions (including client ports) with this processor and all of them end up getting closed correctly. So the "hanging" connection originated from the built-in PutTCP processor. Joe Witt wrote > If you stop the processor manually does it clean them up? > > When the connections appear stuck can you please get a thread dump? > > bin/nifi.sh dump > > The results end up in bootstrap.log. > > Thanks > Joe > > On Sep 17, 2017 2:22 PM, "ddewaele" < > ddewaele@ > > wrote: > >> We are using NiFi PutTCP processors to send messages to a number of Moxa >> onCell ip gateway devices. >> >> These Moxa devices are running on a cellular network with not always the >> most ideal connection. The Moxa only allows for a maximum of 2 >> simultaneous >> client connections. >> >> What we notice is that although we specify connection / read timeouts on >> both PutTCP and the Moxa, that sometimes a connection get "stuck". (In >> the >> moxa network monitoring we see 2 client sockets coming from PutTCP in the >> ESTABLISHED state that never go away). >> >> This doesn't always happen, but often enough for it to be considered a >> problem, as it requires a restart of the moxa ports to clear the >> connections >> (manual step). It typically happens when PutTCP experiences a Timeout. >> >> On the PutTCP processors we have the following settings : >> >> - Idle Connection Expiration : 30 seconds (we've set this higher due to >> bad >> gprs connection) >> - Timeout : 10 seconds (this is only used as a timeout for establishing >> the >> connection) >> >> On the Moxas we have >> >> - TCP alive check time : 2min (this should force the Moxa to close the >> socket)
Re: PutTCP connector not cleaning up dangling connections
Good catch. Can you please be sure to cover in a JIRA? That said, wouldn't we see that in the stack trace during the problematic condition? On Mon, Sep 18, 2017 at 9:16 AM, Bryan Bende wrote: > The code in SocketChannelSender that Davy pointed out could definitely > be the problem... > > It makes a non-blocking channel and calls connect, then goes into a > loop waiting for finishConnect() to return true, but if that doesn't > happen before the configured timeout, then it throws an exception, but > it doesn't first close the channel, and the processor also doesn't > close the sender. > > As an example comparison, this code which is being used by PutTCP: > > https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java#L45-L78 > > Should be doing something like this where it catches, closes, and rethrows: > > https://github.com/apache/nifi/blob/master/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java#L140-L179 > > > On Mon, Sep 18, 2017 at 9:09 AM, Joe Witt wrote: >> Davy >> >> Interesting. So in looking through the stack trace I don't see >> anything related to sockets nifi has initiated to another service and >> nothing for PutTCP. I'm not saying that means there is nothing but >> the stack traces only show the custom GetTCP processors. >> >> You can use netstat to show open sockets from the nifi process. Can >> you try that and share those? >> >> Does the NiFi UI show the processor as having stuck threads? I'm >> guessing not since there is nothing in the stack traces. >> >> Thanks >> >> On Mon, Sep 18, 2017 at 1:54 AM, ddewaele wrote: >>> Stopping the processor doesn't cleanup the tcp connection. It remains >>> ESTABLISHED. >>> >>> There are 2 ways of getting out of it (none of them are ideal). >>> >>> - Restarting Nifi >>> - Restarting the Moxa serial ports >>> >>> I've dumped the output in the following gist : >>> https://gist.github.com/ddewaele/83705003740674962c1e133fb617f68c >>> >>> The GetTCP processor you'll see in the thread dump also interacts with the >>> moxa. It is a Netty based custom processor we created (because there was no >>> GetTCP at the time). However, we log all interactions (including client >>> ports) with this processor and all of them end up getting closed correctly. >>> >>> So the "hanging" connection originated from the built-in PutTCP processor. >>> >>> >>> Joe Witt wrote If you stop the processor manually does it clean them up? When the connections appear stuck can you please get a thread dump? bin/nifi.sh dump The results end up in bootstrap.log. Thanks Joe On Sep 17, 2017 2:22 PM, "ddewaele" < >>> ddewaele@ >>> > wrote: > We are using NiFi PutTCP processors to send messages to a number of Moxa > onCell ip gateway devices. > > These Moxa devices are running on a cellular network with not always the > most ideal connection. The Moxa only allows for a maximum of 2 > simultaneous > client connections. > > What we notice is that although we specify connection / read timeouts on > both PutTCP and the Moxa, that sometimes a connection get "stuck". (In > the > moxa network monitoring we see 2 client sockets coming from PutTCP in the > ESTABLISHED state that never go away). > > This doesn't always happen, but often enough for it to be considered a > problem, as it requires a restart of the moxa ports to clear the > connections > (manual step). It typically happens when PutTCP experiences a Timeout. > > On the PutTCP processors we have the following settings : > > - Idle Connection Expiration : 30 seconds (we've set this higher due to > bad > gprs connection) > - Timeout : 10 seconds (this is only used as a timeout for establishing > the > connection) > > On the Moxas we have > > - TCP alive check time : 2min (this should force the Moxa to close the > socket) > > Yet for some reason the connection remains established. > > Here's what I found out : > > On the moxa I noticed a connection (with client port 48440) that is in > ESTABLISHED mode for 4+ hours. (blocking other connections). On the Moxa > I > can see when the connection was established : > > 2017/09/17 14:20:29 [OpMode] Port01 Connect 10.192.2.90:48440 > > I can track that down in Nifi via the logs (unfortunately PutTCP doesn't > log > client ports, but from the timestamp I'm sure it's this connection : > > 2017-09-17 14:20:10,837 DEBUG [Timer-Driven Process Thread-10] > o.apache.nifi.processors.standard.PutTCP > PutTCP[id=80231a39-1008-1159-a6fa-1f9e3751d608] No available connections, > creating a new one... > 2017-09-17 14:20:20,860 ERROR [Timer-Driven Pr
Re: PutTCP connector not cleaning up dangling connections
The code in SocketChannelSender that Davy pointed out could definitely be the problem... It makes a non-blocking channel and calls connect, then goes into a loop waiting for finishConnect() to return true, but if that doesn't happen before the configured timeout, then it throws an exception, but it doesn't first close the channel, and the processor also doesn't close the sender. As an example comparison, this code which is being used by PutTCP: https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/sender/SocketChannelSender.java#L45-L78 Should be doing something like this where it catches, closes, and rethrows: https://github.com/apache/nifi/blob/master/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java#L140-L179 On Mon, Sep 18, 2017 at 9:09 AM, Joe Witt wrote: > Davy > > Interesting. So in looking through the stack trace I don't see > anything related to sockets nifi has initiated to another service and > nothing for PutTCP. I'm not saying that means there is nothing but > the stack traces only show the custom GetTCP processors. > > You can use netstat to show open sockets from the nifi process. Can > you try that and share those? > > Does the NiFi UI show the processor as having stuck threads? I'm > guessing not since there is nothing in the stack traces. > > Thanks > > On Mon, Sep 18, 2017 at 1:54 AM, ddewaele wrote: >> Stopping the processor doesn't cleanup the tcp connection. It remains >> ESTABLISHED. >> >> There are 2 ways of getting out of it (none of them are ideal). >> >> - Restarting Nifi >> - Restarting the Moxa serial ports >> >> I've dumped the output in the following gist : >> https://gist.github.com/ddewaele/83705003740674962c1e133fb617f68c >> >> The GetTCP processor you'll see in the thread dump also interacts with the >> moxa. It is a Netty based custom processor we created (because there was no >> GetTCP at the time). However, we log all interactions (including client >> ports) with this processor and all of them end up getting closed correctly. >> >> So the "hanging" connection originated from the built-in PutTCP processor. >> >> >> Joe Witt wrote >>> If you stop the processor manually does it clean them up? >>> >>> When the connections appear stuck can you please get a thread dump? >>> >>> bin/nifi.sh dump >>> >>> The results end up in bootstrap.log. >>> >>> Thanks >>> Joe >>> >>> On Sep 17, 2017 2:22 PM, "ddewaele" < >> >>> ddewaele@ >> >>> > wrote: >>> We are using NiFi PutTCP processors to send messages to a number of Moxa onCell ip gateway devices. These Moxa devices are running on a cellular network with not always the most ideal connection. The Moxa only allows for a maximum of 2 simultaneous client connections. What we notice is that although we specify connection / read timeouts on both PutTCP and the Moxa, that sometimes a connection get "stuck". (In the moxa network monitoring we see 2 client sockets coming from PutTCP in the ESTABLISHED state that never go away). This doesn't always happen, but often enough for it to be considered a problem, as it requires a restart of the moxa ports to clear the connections (manual step). It typically happens when PutTCP experiences a Timeout. On the PutTCP processors we have the following settings : - Idle Connection Expiration : 30 seconds (we've set this higher due to bad gprs connection) - Timeout : 10 seconds (this is only used as a timeout for establishing the connection) On the Moxas we have - TCP alive check time : 2min (this should force the Moxa to close the socket) Yet for some reason the connection remains established. Here's what I found out : On the moxa I noticed a connection (with client port 48440) that is in ESTABLISHED mode for 4+ hours. (blocking other connections). On the Moxa I can see when the connection was established : 2017/09/17 14:20:29 [OpMode] Port01 Connect 10.192.2.90:48440 I can track that down in Nifi via the logs (unfortunately PutTCP doesn't log client ports, but from the timestamp I'm sure it's this connection : 2017-09-17 14:20:10,837 DEBUG [Timer-Driven Process Thread-10] o.apache.nifi.processors.standard.PutTCP PutTCP[id=80231a39-1008-1159-a6fa-1f9e3751d608] No available connections, creating a new one... 2017-09-17 14:20:20,860 ERROR [Timer-Driven Process Thread-10] o.apache.nifi.processors.standard.PutTCP PutTCP[id=80231a39-1008-1159-a6fa-1f9e3751d608] No available connections, and unable to create a new one, transferring StandardFlowFileRecord[uuid=79f2a166-5211-4d2d-9275-03f0ce4d5b29,claim= StandardContentClaim [resourceClaim=StandardResourceCl
Re: PutTCP connector not cleaning up dangling connections
Davy Interesting. So in looking through the stack trace I don't see anything related to sockets nifi has initiated to another service and nothing for PutTCP. I'm not saying that means there is nothing but the stack traces only show the custom GetTCP processors. You can use netstat to show open sockets from the nifi process. Can you try that and share those? Does the NiFi UI show the processor as having stuck threads? I'm guessing not since there is nothing in the stack traces. Thanks On Mon, Sep 18, 2017 at 1:54 AM, ddewaele wrote: > Stopping the processor doesn't cleanup the tcp connection. It remains > ESTABLISHED. > > There are 2 ways of getting out of it (none of them are ideal). > > - Restarting Nifi > - Restarting the Moxa serial ports > > I've dumped the output in the following gist : > https://gist.github.com/ddewaele/83705003740674962c1e133fb617f68c > > The GetTCP processor you'll see in the thread dump also interacts with the > moxa. It is a Netty based custom processor we created (because there was no > GetTCP at the time). However, we log all interactions (including client > ports) with this processor and all of them end up getting closed correctly. > > So the "hanging" connection originated from the built-in PutTCP processor. > > > Joe Witt wrote >> If you stop the processor manually does it clean them up? >> >> When the connections appear stuck can you please get a thread dump? >> >> bin/nifi.sh dump >> >> The results end up in bootstrap.log. >> >> Thanks >> Joe >> >> On Sep 17, 2017 2:22 PM, "ddewaele" < > >> ddewaele@ > >> > wrote: >> >>> We are using NiFi PutTCP processors to send messages to a number of Moxa >>> onCell ip gateway devices. >>> >>> These Moxa devices are running on a cellular network with not always the >>> most ideal connection. The Moxa only allows for a maximum of 2 >>> simultaneous >>> client connections. >>> >>> What we notice is that although we specify connection / read timeouts on >>> both PutTCP and the Moxa, that sometimes a connection get "stuck". (In >>> the >>> moxa network monitoring we see 2 client sockets coming from PutTCP in the >>> ESTABLISHED state that never go away). >>> >>> This doesn't always happen, but often enough for it to be considered a >>> problem, as it requires a restart of the moxa ports to clear the >>> connections >>> (manual step). It typically happens when PutTCP experiences a Timeout. >>> >>> On the PutTCP processors we have the following settings : >>> >>> - Idle Connection Expiration : 30 seconds (we've set this higher due to >>> bad >>> gprs connection) >>> - Timeout : 10 seconds (this is only used as a timeout for establishing >>> the >>> connection) >>> >>> On the Moxas we have >>> >>> - TCP alive check time : 2min (this should force the Moxa to close the >>> socket) >>> >>> Yet for some reason the connection remains established. >>> >>> Here's what I found out : >>> >>> On the moxa I noticed a connection (with client port 48440) that is in >>> ESTABLISHED mode for 4+ hours. (blocking other connections). On the Moxa >>> I >>> can see when the connection was established : >>> >>> 2017/09/17 14:20:29 [OpMode] Port01 Connect 10.192.2.90:48440 >>> >>> I can track that down in Nifi via the logs (unfortunately PutTCP doesn't >>> log >>> client ports, but from the timestamp I'm sure it's this connection : >>> >>> 2017-09-17 14:20:10,837 DEBUG [Timer-Driven Process Thread-10] >>> o.apache.nifi.processors.standard.PutTCP >>> PutTCP[id=80231a39-1008-1159-a6fa-1f9e3751d608] No available connections, >>> creating a new one... >>> 2017-09-17 14:20:20,860 ERROR [Timer-Driven Process Thread-10] >>> o.apache.nifi.processors.standard.PutTCP >>> PutTCP[id=80231a39-1008-1159-a6fa-1f9e3751d608] No available connections, >>> and unable to create a new one, transferring >>> StandardFlowFileRecord[uuid=79f2a166-5211-4d2d-9275-03f0ce4d5b29,claim= >>> StandardContentClaim >>> [resourceClaim=StandardResourceClaim[id=1505641210025-1, >>> container=default, >>> section=1], offset=84519, length=9],offset=0,name= >>> 23934743676390659,size=9] >>> to failure: java.net.SocketTimeoutException: Timed out connecting to >>> 10.32.133.40:4001 >>> 2017-09-17 14:20:20,860 ERROR [Timer-Driven Process Thread-10] >>> o.apache.nifi.processors.standard.PutTCP >>> java.net.SocketTimeoutException: Timed out connecting to >>> 10.32.133.40:4001 >>> at >>> org.apache.nifi.processor.util.put.sender.SocketChannelSender.open( >>> SocketChannelSender.java:66) >>> ~[nifi-processor-utils-1.1.0.jar:1.1.0] >>> at >>> org.apache.nifi.processor.util.put.AbstractPutEventProcessor.createSender( >>> AbstractPutEventProcessor.java:312) >>> ~[nifi-processor-utils-1.1.0.jar:1.1.0] >>> at >>> org.apache.nifi.processors.standard.PutTCP.createSender(PutTCP.java:121) >>> [nifi-standard-processors-1.1.0.jar:1.1.0] >>> at >>> org.apache.nifi.processor.util.put.AbstractPutEventProcessor. >>> acquireSender(AbstractPutEventProcessor.java:334) >>> ~[nifi-proces
Re: org.apache.nifi.spark.NiFiReceiver and spark spark.streaming.receiver.maxRate
A typical production setup is to use Kafka in the middle. Andrew On Mon, Sep 18, 2017, 3:02 AM Margus Roo wrote: > Hi > > I need to take flow with Spark streaming from Nifi port. As we know > Spark supports spark.streaming.receiver.maxRate and > spark.streaming.receiver.backpressure > > Seems that org.apache.nifi.spark.NiFiReceiver does support it at all. > > https://github.com/apache/nifi/tree/master/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark > - code is quite old too. > > My question is - what is the recommended aproach today getting stream > from nifi with Spark? Is > > https://github.com/apache/nifi/tree/master/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark > the best we have? If it is then what is the best aproach to integrate > spark maxRate or backpressure in it? > > -- > Margus (margusja) Roo > http://margus.roo.ee > skype: margusja > https://www.facebook.com/allan.tuuring > +372 51 48 780 > >
org.apache.nifi.spark.NiFiReceiver and spark spark.streaming.receiver.maxRate
Hi I need to take flow with Spark streaming from Nifi port. As we know Spark supports spark.streaming.receiver.maxRate and spark.streaming.receiver.backpressure Seems that org.apache.nifi.spark.NiFiReceiver does support it at all. https://github.com/apache/nifi/tree/master/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark - code is quite old too. My question is - what is the recommended aproach today getting stream from nifi with Spark? Is https://github.com/apache/nifi/tree/master/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark the best we have? If it is then what is the best aproach to integrate spark maxRate or backpressure in it? -- Margus (margusja) Roo http://margus.roo.ee skype: margusja https://www.facebook.com/allan.tuuring +372 51 48 780