RE: [EXT] Re: Cluster Peer Lists
Thanks Koji, exactly the answers I was looking for. I've switched over all the Remote Processor Group's to use the list of nodes. --Peter -Original Message- From: Koji Kawamura [mailto:ijokaruma...@gmail.com] Sent: Thursday, September 27, 2018 7:08 PM To: users@nifi.apache.org Subject: [EXT] Re: Cluster Peer Lists Hi Peter, Site-to-Site client refreshes remote peer list per 60 secs. https://github.com/apache/nifi/blob/master/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java#L60 The address configured to setup a S2S client is used to get remote peer list initially. After that, the client knows node01, 02 and 03 are available peers, then when it refreshes peer list, even if it fails to access node01, it should retrieve the updated peer list from node02 or 03. However, if node01 stays in the remote cluster (until it is removed from the cluster, node02 and 03 still think it's a part of the cluster), the returned peer list contains node01. https://github.com/apache/nifi/blob/master/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java#L383 Another thing to note is that S2S client calculates destination for the next 128 transaction in advance. So, if your client does not make transactions often, it may take longer for re-calculating the next destination. https://github.com/apache/nifi/blob/master/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/PeerSelector.java#L159 To avoid having a single host address at S2S client configuration, you can use multiple ones delimited by commas. With this, S2S client can connect when it's restarted even if node01 is down. E.g. http://node01:8080,http://node02:8080,http://node03:8080 Alternatively, round robin DNS name or Reverse Proxy for the bootstrap node address can be used similarly. Thanks, Koji On Fri, Sep 28, 2018 at 4:30 AM Peter Wicks (pwicks) wrote: > > Hi NiFi team, > > > > We had one of the nodes in our cluster go offline today. We eventually > resolved the issue, but it exposed some issues in our configuration across > our edge NiFi instances. > > > > Right now we have non-clustered instances of NiFi distributed around the > world, pushing data back to a three node cluster via Site-to-Site. All of > these instances use the name of the first node (node01), and pull back the > peer list and weights from it. But node01 is the node that went offline > today, and while some site-to-site connections appeared to use cached data > and continued uploading data to node02 and node03, many of the site-to-site > connections went down because they were not able to pull the peer list from > the cluster, which makes perfect sense to me. > > > > One question that I was curious about, how long is a peer list cached for if > an updated list can’t be retrieved from the cluster? > > > > What are the best practices for fixing this? We were throwing around ideas of > using a load balancer or round robin DNS name as the entry point for > site-to-site, but I figured others have probably already tackled this problem > before and could share some ideas. > > > > Thanks, > > Peter
Re: POJO to Record
Moving users list to BCC and adding devs list, as this question is better suited for the devs. Ed, Does XMLReader not parse your XML? I guess you'd need to know the schema so if you don't (especially if it is dynamic) then you might be faced with a custom processor as you described. Take a look at DataTypeUtils, it has methods toRecord() and convertRecordFieldtoObject() which helps you convert from POJO <->Record. The JoltTransformRecord processor uses these, so you can have a look at that code for an example. Regards, Matt On Fri, Sep 28, 2018 at 11:28 AM Ed J wrote: > > Hi, > I'm ingesting some XML with an overly complicated structure and I've build a > custom java DOM parser to wrangle it into a saner form. (Is that frowned upon > - should I just use the built-in processors to wrangle it?) > > So my question is, I've parsed the XML into a simple POJO, how do I get that > pojo into the next processor as a 'Record'. My custom nifi processor's > onTrigger looks like this: > > MyParser parser = new MyParser(); > MyPojo pojo = parser.parse(flowFileContents); > > // TODO: convert pojo to record > Record myrecord = pojo.toRecord(); // how to do this? > session.transfer(myrecord, REL_SUCCESS) > > I'm guessing I could convert the pojo to JSON, write that out to the next > stage, and then use a JsonRecordReader to convert the JSON into a record. > However, if I can go straight from pojo to record, that seems more efficient > - just not sure how to go about it. Thanks. >
Re: ConsumeMQTT processor causing resource issues when deployed in MiNiFi
Certainly could be that memory space on the machine is constrained enough that creation of a new thread isn't possible. Could be a resource leak. To Aldrin's point though I'd say the thread dump would be really helpful. In fact, I'd recommend grabbing a few at different time intervals such as shortly after launch, after a 'while' when things are working well, and 'when it hits this point'. How much memory is on the system and how much is configured for the JVM? On Fri, Sep 28, 2018 at 9:30 AM Aldrin Piri wrote: > > Hello Patricia, > > Could you provide more details about JVM properties that you tried > configuring? Please also let us know the relevant versions of MiNiFi and > NiFi you are using. If possible, providing the flow you are attempting to > run would be helpful. Otherwise, if you could get into specifics about how > the consume processor is configured would give a few more datapoints. > > Additionally, I would like to request that you grab us a thread dump from > MiNiFi when this issue is exhibited. This can be accomplished by running > bin/minifi.sh dump . > > At its core, and beyond the initial configuration startup, MiNiFi is running > primarily the same core libraries as NiFi and would expect similar JVM > configurations to have similar success in operating. The inability to create > new threads is a bit of a curious one that I have not seen especially when > the same flow is not at issue in NiFi. > > Thanks! > --aldrin > > On Fri, Sep 28, 2018 at 5:04 AM Patricia Quill wrote: >> >> Have deployed a ConsumeMQTT processor to a remote server using MiNiFi (Java >> agent) and configured it to subscribe to a mosquitto broker. Problem I am >> seeing is that it is causing the OS to run out of resources. >> >> 2018-09-27 09:06:37,060 WARN [Timer-Driven Process Thread-9] >> o.a.n.controller.tasks.ConnectableTask Administratively Yielding >> ConsumeMQTT[id=b3271a8d-cc15-3f10--] due to uncaught >> Exception: java.lang.OutOfMemoryError: unable to create new native thread >> java.lang.OutOfMemoryError: unable to create new native thread >> at java.lang.Thread.start0(Native Method) >> at java.lang.Thread.start(Thread.java:717) >> at >> java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957) >> at >> java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1603) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:334) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622) >> at >> org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.start(ClientComms.java:675) >> at >> org.eclipse.paho.client.mqttv3.internal.ClientComms.connect(ClientComms.java:280) >> at >> org.eclipse.paho.client.mqttv3.internal.ConnectActionListener.connect(ConnectActionListener.java:185) >> at >> org.eclipse.paho.client.mqttv3.MqttAsyncClient.connect(MqttAsyncClient.java:774) >> at org.eclipse.paho.client.mqttv3.MqttClient.connect(MqttClient.java:333) >> at >> org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.setAndConnectClient(AbstractMQTTProcessor.java:370) >> at >> org.apache.nifi.processors.mqtt.ConsumeMQTT.reconnect(ConsumeMQTT.java:347) >> at >> org.apache.nifi.processors.mqtt.ConsumeMQTT.onTrigger(ConsumeMQTT.java:256) >> at >> org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.onTrigger(AbstractMQTTProcessor.java:355) >> at >> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165) >> at >> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203) >> at >> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> >> >> I've tried it on a number of different OS types - Ubuntu 14_04, Ubuntu >> 16_04 and CentOS 7. I have tried tweaking the JVM properties but to no >> avail. Am using NiFi 1.7.0. >> The MiNiFi setup seems to be OK as I tried a test with the TailFile >> processor and had no issues. The ConsumeMQTT processor works without any >> issues if it's deployed on the NiFi server. >> >> Has anyone else come across this or have any suggestions?
Re: ConsumeMQTT processor causing resource issues when deployed in MiNiFi
Hello Patricia, Could you provide more details about JVM properties that you tried configuring? Please also let us know the relevant versions of MiNiFi and NiFi you are using. If possible, providing the flow you are attempting to run would be helpful. Otherwise, if you could get into specifics about how the consume processor is configured would give a few more datapoints. Additionally, I would like to request that you grab us a thread dump from MiNiFi when this issue is exhibited. This can be accomplished by running bin/minifi.sh dump . At its core, and beyond the initial configuration startup, MiNiFi is running primarily the same core libraries as NiFi and would expect similar JVM configurations to have similar success in operating. The inability to create new threads is a bit of a curious one that I have not seen especially when the same flow is not at issue in NiFi. Thanks! --aldrin On Fri, Sep 28, 2018 at 5:04 AM Patricia Quill wrote: > Have deployed a ConsumeMQTT processor to a remote server using MiNiFi > (Java agent) and configured it to subscribe to a mosquitto broker. Problem > I am seeing is that it is causing the OS to run out of resources. > > 2018-09-27 09:06:37,060 WARN [Timer-Driven Process Thread-9] > o.a.n.controller.tasks.ConnectableTask Administratively Yielding > ConsumeMQTT[id=b3271a8d-cc15-3f10--] due to uncaught > Exception: java.lang.OutOfMemoryError: unable to create new native thread > java.lang.OutOfMemoryError: unable to create new native thread > at java.lang.Thread.start0(Native Method) > at java.lang.Thread.start(Thread.java:717) > at > java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957) > at > java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1603) > at > java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:334) > at > java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) > at > java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622) > at > org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.start(ClientComms.java:675) > at > org.eclipse.paho.client.mqttv3.internal.ClientComms.connect(ClientComms.java:280) > at > org.eclipse.paho.client.mqttv3.internal.ConnectActionListener.connect(ConnectActionListener.java:185) > at > org.eclipse.paho.client.mqttv3.MqttAsyncClient.connect(MqttAsyncClient.java:774) > at org.eclipse.paho.client.mqttv3.MqttClient.connect(MqttClient.java:333) > at > org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.setAndConnectClient(AbstractMQTTProcessor.java:370) > at > org.apache.nifi.processors.mqtt.ConsumeMQTT.reconnect(ConsumeMQTT.java:347) > at > org.apache.nifi.processors.mqtt.ConsumeMQTT.onTrigger(ConsumeMQTT.java:256) > at > org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.onTrigger(AbstractMQTTProcessor.java:355) > at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165) > at > org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203) > at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > > I've tried it on a number of different OS types - Ubuntu 14_04, Ubuntu > 16_04 and CentOS 7. I have tried tweaking the JVM properties but to no > avail. Am using NiFi 1.7.0. > The MiNiFi setup seems to be OK as I tried a test with the TailFile > processor and had no issues. The ConsumeMQTT processor works without any > issues if it's deployed on the NiFi server. > > Has anyone else come across this or have any suggestions? >
ConsumeMQTT processor causing resource issues when deployed in MiNiFi
Have deployed a ConsumeMQTT processor to a remote server using MiNiFi (Java agent) and configured it to subscribe to a mosquitto broker. Problem I am seeing is that it is causing the OS to run out of resources. 2018-09-27 09:06:37,060 WARN [Timer-Driven Process Thread-9] o.a.n.controller.tasks.ConnectableTask Administratively Yielding ConsumeMQTT[id=b3271a8d-cc15-3f10--] due to uncaught Exception: java.lang.OutOfMemoryError: unable to create new native threadjava.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:717) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957) at java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1603) at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:334) at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622) at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.start(ClientComms.java:675) at org.eclipse.paho.client.mqttv3.internal.ClientComms.connect(ClientComms.java:280) at org.eclipse.paho.client.mqttv3.internal.ConnectActionListener.connect(ConnectActionListener.java:185) at org.eclipse.paho.client.mqttv3.MqttAsyncClient.connect(MqttAsyncClient.java:774) at org.eclipse.paho.client.mqttv3.MqttClient.connect(MqttClient.java:333) at org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.setAndConnectClient(AbstractMQTTProcessor.java:370) at org.apache.nifi.processors.mqtt.ConsumeMQTT.reconnect(ConsumeMQTT.java:347) at org.apache.nifi.processors.mqtt.ConsumeMQTT.onTrigger(ConsumeMQTT.java:256) at org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor.onTrigger(AbstractMQTTProcessor.java:355) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) I've tried it on a number of different OS types - Ubuntu 14_04, Ubuntu 16_04 and CentOS 7. I have tried tweaking the JVM properties but to no avail. Am using NiFi 1.7.0.The MiNiFi setup seems to be OK as I tried a test with the TailFile processor and had no issues. The ConsumeMQTT processor works without any issues if it's deployed on the NiFi server. Has anyone else come across this or have any suggestions?