I've done something like this by wrapping the command in a shell script: http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/
My use case was slightly different, but I'm pretty sure you can adapt the same idea. -Joey > On Oct 10, 2015, at 03:52, Parul Agrawal <parulagrawa...@gmail.com> wrote: > > Hi, > > I actually need to get the data from pipe. > So the actual command I would need is mkfifo /tmp/packet;tshark -i ens160 -T > pdml >/tmp/packet. > Is it possible to use ExecuteProcessor for multiple commands ? > >> On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal <parulagrawa...@gmail.com> >> wrote: >> Hi, >> >> I added custom flume source and when flume source is sending the data to >> flume sink, below mentioned error is thrown at flume sink. >> >> Administratively Yielded for 1 sec due to processing failure >> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9] >> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding >> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught >> Exception: java.lang.IllegalStateException: close() called when transaction >> is OPEN - you must either commit or rollback first >> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9] >> o.a.n.c.t.ContinuallyRunProcessorTask >> java.lang.IllegalStateException: close() called when transaction is OPEN - >> you must either commit or rollback first >> at >> com.google.common.base.Preconditions.checkState(Preconditions.java:172) >> ~[guava-r05.jar:na] >> at >> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179) >> ~[flume-ng-core-1.6.0.jar:1.6.0] >> at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105) >> ~[flume-ng-core-1.6.0.jar:1.6.0] >> at >> org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139) >> ~[na:na] >> at >> org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148) >> ~[na:na] >> at >> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077) >> ~[nifi-framework-core-0.3.0.jar:0.3.0] >> at >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127) >> [nifi-framework-core-0.3.0.jar:0.3.0] >> at >> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49) >> [nifi-framework-core-0.3.0.jar:0.3.0] >> at >> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119) >> [nifi-framework-core-0.3.0.jar:0.3.0] >> at >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >> [na:1.7.0_85] >> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) >> [na:1.7.0_85] >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) >> [na:1.7.0_85] >> at >> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >> [na:1.7.0_85] >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> [na:1.7.0_85] >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> [na:1.7.0_85] >> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85] >> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9] >> o.a.n.processors.flume.ExecuteFlumeSink >> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] >> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process >> due to org.apache.nifi.processor.exception.FlowFileHandlingException: >> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim >> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default, >> section=7], offset=180436, >> length=14078],offset=0,name=8311685679474355,size=14078] is not known in >> this session (StandardProcessSession[id=218318]); rolling back session: >> org.apache.nifi.processor.exception.FlowFileHandlingException: >> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim >> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default, >> section=7], offset=180436, >> length=14078],offset=0,name=8311685679474355,size=14078] is not known in >> this session (StandardProcessSession[id=218318]) >> >> Any idea what could be wrong in this. >> >> Thanks and Regards, >> Parul >> >> >>> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bbe...@gmail.com> wrote: >>> Hi Parul, >>> >>> I think it would be good to keep the convo going on the users list since >>> there are more people who can offer help there, and also helps everyone >>> learn new solutions. >>> >>> The quick answer though is that NiFi has an ExecuteProcess processor which >>> could execute "tshark -i eth0 -T pdml". >>> >>> There is not currently an XmlToJson processor, so this could be a place >>> where you need a custom processor. For simple cases you can use an >>> EvaluateXPath processor to extract values from the XML, and then a >>> ReplaceText processor to build a new json document from those extracted >>> values. >>> >>> -Bryan >>> >>> >>>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <parulagrawa...@gmail.com> >>>> wrote: >>>> Hi, >>>> >>>> Little more to add..... >>>> I need to keep reading the flowfile till END_TAG is received. i.e. we may >>>> need to concatenate the flowfile data till END_TAG. >>>> and then convert it to json and call PutFile() processor. >>>> >>>> Thanks and Regards, >>>> Parul >>>> >>>> >>>> >>>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <parulagrawa...@gmail.com> >>>>> wrote: >>>>> Hi, >>>>> >>>>> Thank you very much again for the guidance provided. >>>>> Basically I would need a processor which would convert XML file to Json. >>>>> >>>>> Currently I have a flume source which is of type "exec" and the command >>>>> used is "tshark -i eth0 -T pdml". >>>>> >>>>> Here Flume source keeps sending data to flume sink. This flow file would >>>>> be of PDML format. >>>>> >>>>> Now I need a processor which would do the following >>>>> >>>>> 1) Form a complete XML file based on START TAG (<packet>) >>>>> and END TAG (</packet>) >>>>> 2) Once the XML message is formed convert it to json. >>>>> 3) Place a json file to local directory using PutFile() processor. >>>>> >>>>> I am not sure if I could able to explain the processor requirement. >>>>> Would be really great if you could help me in this. >>>>> >>>>> Thanks and Regards, >>>>> Parul >>>>> >>>>> >>>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <joe...@gmail.com> >>>>>> wrote: >>>>>> > If you plan to use NiFi for the long term, it might be worth investing >>>>>> > in converting your custom Flume components to NiFi processors. We can >>>>>> > help you get started if you need any guidance going that route. >>>>>> >>>>>> +1. Running Flume sources/sinks is meant as a transition step. It's >>>>>> really useful if you have a complex Flume flow and want to migrate >>>>>> only parts of it over to NiFi at a time. I would port any custom >>>>>> sources and sinks to NiFi once you knew that it would meet your needs >>>>>> well. NiFi has a lot of documentation on writing processors and the >>>>>> concepts map pretty well if you're already familiar with Flume's >>>>>> execution model. >>>>>> >>>>>> -Joey >>>>>> >>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bbe...@gmail.com> wrote: >>>>>> > >>>>>> > Hi Parul, >>>>>> > >>>>>> > It is possible to deploy a custom Flume source/sink to NiFi, but due >>>>>> > to the way the Flume processors load the classes for the sources and >>>>>> > sinks, the jar you deploy to the lib directory also needs to include >>>>>> > the other dependencies your source/sink needs (or they each need to >>>>>> > individually be in lib/ directly). >>>>>> > >>>>>> > So here is a sample project I created that makes a shaded jar: >>>>>> > https://github.com/bbende/my-flume-source >>>>>> > >>>>>> > It will contain the custom source and following dependencies all in >>>>>> > one jar: >>>>>> > >>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT >>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile >>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile >>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile >>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile >>>>>> > \- com.google.guava:guava:jar:11.0.2:compile >>>>>> > \- com.google.code.findbugs:jsr305:jar:1.3.9:compile >>>>>> > >>>>>> > I copied that to NiFi lib, restarted, created an ExecuteFlumeSource >>>>>> > processor with the following config: >>>>>> > >>>>>> > Source Type = org.apache.flume.MySource >>>>>> > Agent Name = a1 >>>>>> > Source Name = r1 >>>>>> > Flume Configuration = a1.sources = r1 >>>>>> > >>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log >>>>>> > >>>>>> > Keep in mind that this could become risky because any classes found in >>>>>> > the lib directory would be accessible to all NARs in NiFi and would be >>>>>> > found before classes within a NAR because the parent is checked first >>>>>> > during class loading. This example isn't too risky because we are only >>>>>> > bringing in flume jars and one guava jar, but for example if another >>>>>> > nar uses a different version of guava this is going to cause a problem. >>>>>> > >>>>>> > If you plan to use NiFi for the long term, it might be worth investing >>>>>> > in converting your custom Flume components to NiFi processors. We can >>>>>> > help you get started if you need any guidance going that route. >>>>>> > >>>>>> > -Bryan >>>>>> > >>>>>> > >>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal >>>>>> > <parulagrawa...@gmail.com> wrote: >>>>>> >> >>>>>> >> Hello Bryan, >>>>>> >> >>>>>> >> Thank you very much for your response. >>>>>> >> >>>>>> >> Is it possible to have customized flume source and sink in Nifi? >>>>>> >> I have my own customized source and sink? I followed below steps to >>>>>> >> add my own customized source but it did not work. >>>>>> >> >>>>>> >> 1) Created Maven project and added customized source. (flume.jar was >>>>>> >> created after this step) >>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder. >>>>>> >> 3) Added flume source processor with the below configuration >>>>>> >> >>>>>> >> Property Value >>>>>> >> Source Type com.flume.source.Source >>>>>> >> Agent Name a1 >>>>>> >> Source Name k1. >>>>>> >> >>>>>> >> But I am getting the below error in Flume Source Processor. >>>>>> >> "Failed to run validation due to java.lang.NoClassDefFoundError : >>>>>> >> /org/apache/flume/PollableSource." >>>>>> >> >>>>>> >> Can you please help me in this regard. Any step/configuration I >>>>>> >> missed. >>>>>> >> >>>>>> >> Thanks and Regards, >>>>>> >> Parul >>>>>> >> >>>>>> >> >>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bbe...@gmail.com> wrote: >>>>>> >>> >>>>>> >>> Hello, >>>>>> >>> >>>>>> >>> The NiFi Flume processors are for running Flume sources and sinks >>>>>> >>> with in NiFi. They don't communicate with an external Flume process. >>>>>> >>> >>>>>> >>> In your example you would need an ExecuteFlumeSource configured to >>>>>> >>> run the netcat source, connected to a ExecuteFlumeSink configured >>>>>> >>> with the logger. >>>>>> >>> >>>>>> >>> -Bryan >>>>>> >>> >>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal >>>>>> >>> <parulagrawa...@gmail.com> wrote: >>>>>> >>>> >>>>>> >>>> Hi, >>>>>> >>>> >>>>>> >>>> I was trying to run Nifi Flume processor with the below mentioned >>>>>> >>>> details but not could bring it up. >>>>>> >>>> >>>>>> >>>> I already started flume with the sample configuration file >>>>>> >>>> ============================================= >>>>>> >>>> # example.conf: A single-node Flume configuration >>>>>> >>>> >>>>>> >>>> # Name the components on this agent >>>>>> >>>> a1.sources = r1 >>>>>> >>>> a1.sinks = k1 >>>>>> >>>> a1.channels = c1 >>>>>> >>>> >>>>>> >>>> # Describe/configure the source >>>>>> >>>> a1.sources.r1.type = netcat >>>>>> >>>> a1.sources.r1.bind = localhost >>>>>> >>>> a1.sources.r1.port = 44444 >>>>>> >>>> >>>>>> >>>> # Describe the sink >>>>>> >>>> a1.sinks.k1.type = logger >>>>>> >>>> >>>>>> >>>> # Use a channel which buffers events in memory >>>>>> >>>> a1.channels.c1.type = memory >>>>>> >>>> a1.channels.c1.capacity = 1000 >>>>>> >>>> a1.channels.c1.transactionCapacity = 100 >>>>>> >>>> >>>>>> >>>> # Bind the source and sink to the channel >>>>>> >>>> a1.sources.r1.channels = c1 >>>>>> >>>> a1.sinks.k1.channel = c1 >>>>>> >>>> ============================================= >>>>>> >>>> >>>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf conf >>>>>> >>>> --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console >>>>>> >>>> >>>>>> >>>> In the Nifi browser of ExecuteFlumeSink following configuration was >>>>>> >>>> done: >>>>>> >>>> Property Value >>>>>> >>>> Sink Type logger >>>>>> >>>> Agent Name a1 >>>>>> >>>> Sink Name k1. >>>>>> >>>> >>>>>> >>>> Event is sent to the flume using: >>>>>> >>>> $ telnet localhost 44444 >>>>>> >>>> Trying 127.0.0.1... >>>>>> >>>> Connected to localhost.localdomain (127.0.0.1). >>>>>> >>>> Escape character is '^]'. >>>>>> >>>> Hello world! <ENTER> >>>>>> >>>> OK >>>>>> >>>> >>>>>> >>>> But I could not get any data in the nifi flume processor. Request >>>>>> >>>> your >>>>>> >>>> help in this. >>>>>> >>>> Do i need to change the example.conf file of flume so that Nifi >>>>>> >>>> Flume >>>>>> >>>> Sink should get the data. >>>>>> >>>> >>>>>> >>>> Thanks and Regards, >>>>>> >>>> Parul >>>>>> >>> >>>>>> >>> >>>>>> >>> >>>>>> >>> -- >>>>>> >>> Sent from Gmail Mobile >>>>>> >> >>>>>> >> >>>>>> > >