I've done something like this by wrapping the command in a shell script:

http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/

My use case was slightly different, but I'm pretty sure you can adapt the same 
idea. 

-Joey

> On Oct 10, 2015, at 03:52, Parul Agrawal <parulagrawa...@gmail.com> wrote:
> 
> Hi,
> 
> I actually need to get the data from pipe.
> So the actual command I would need is mkfifo /tmp/packet;tshark -i ens160 -T 
> pdml >/tmp/packet.
> Is it possible to use ExecuteProcessor for multiple commands ? 
> 
>> On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal <parulagrawa...@gmail.com> 
>> wrote:
>> Hi,
>> 
>> I added custom flume source and when flume source is sending the data to 
>> flume sink, below mentioned error is thrown at flume sink.
>> 
>>  Administratively Yielded for 1 sec due to processing failure
>> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9] 
>> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding 
>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught 
>> Exception: java.lang.IllegalStateException: close() called when transaction 
>> is OPEN - you must either commit or rollback first
>> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9] 
>> o.a.n.c.t.ContinuallyRunProcessorTask
>> java.lang.IllegalStateException: close() called when transaction is OPEN - 
>> you must either commit or rollback first
>>         at 
>> com.google.common.base.Preconditions.checkState(Preconditions.java:172) 
>> ~[guava-r05.jar:na]
>>         at 
>> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
>>  ~[flume-ng-core-1.6.0.jar:1.6.0]
>>         at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105) 
>> ~[flume-ng-core-1.6.0.jar:1.6.0]
>>         at 
>> org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139)
>>  ~[na:na]
>>         at 
>> org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148)
>>  ~[na:na]
>>         at 
>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
>>  ~[nifi-framework-core-0.3.0.jar:0.3.0]
>>         at 
>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
>>  [nifi-framework-core-0.3.0.jar:0.3.0]
>>         at 
>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
>>  [nifi-framework-core-0.3.0.jar:0.3.0]
>>         at 
>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
>>  [nifi-framework-core-0.3.0.jar:0.3.0]
>>         at 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
>> [na:1.7.0_85]
>>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) 
>> [na:1.7.0_85]
>>         at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>>  [na:1.7.0_85]
>>         at 
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>  [na:1.7.0_85]
>>         at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  [na:1.7.0_85]
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  [na:1.7.0_85]
>>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85]
>> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9] 
>> o.a.n.processors.flume.ExecuteFlumeSink 
>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] 
>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process 
>> due to org.apache.nifi.processor.exception.FlowFileHandlingException: 
>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>  [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default, 
>> section=7], offset=180436, 
>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in 
>> this session (StandardProcessSession[id=218318]); rolling back session: 
>> org.apache.nifi.processor.exception.FlowFileHandlingException: 
>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
>>  [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default, 
>> section=7], offset=180436, 
>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in 
>> this session (StandardProcessSession[id=218318])
>> 
>> Any idea what could be wrong in this.
>> 
>> Thanks and Regards,
>> Parul
>> 
>> 
>>> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bbe...@gmail.com> wrote:
>>> Hi Parul,
>>> 
>>> I think it would be good to keep the convo going on the users list since 
>>> there are more people who can offer help there, and also helps everyone 
>>> learn new solutions.
>>> 
>>> The quick answer though is that NiFi has an ExecuteProcess processor which 
>>> could execute "tshark -i eth0 -T pdml". 
>>> 
>>> There is not currently an XmlToJson processor, so this could be a place 
>>> where you need a custom processor. For simple cases you can use an 
>>> EvaluateXPath processor to extract values from the XML, and then a 
>>> ReplaceText processor to build a new json document from those extracted 
>>> values.
>>> 
>>> -Bryan
>>> 
>>> 
>>>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <parulagrawa...@gmail.com> 
>>>> wrote:
>>>> Hi,
>>>> 
>>>> Little more to add.....
>>>>  I need to keep reading the flowfile till END_TAG is received. i.e. we may 
>>>> need to concatenate the flowfile data till END_TAG.
>>>> and then convert it to json and call PutFile() processor.
>>>> 
>>>> Thanks and Regards,
>>>> Parul
>>>> 
>>>> 
>>>> 
>>>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <parulagrawa...@gmail.com> 
>>>>> wrote:
>>>>> Hi,
>>>>> 
>>>>> Thank you very much again for the guidance provided.
>>>>> Basically I would need a processor which would convert XML file to Json.
>>>>> 
>>>>> Currently I have a flume source which is of type "exec" and the command 
>>>>> used is "tshark -i eth0 -T pdml".
>>>>> 
>>>>> Here Flume source keeps sending data to flume sink. This flow file would 
>>>>> be of PDML format.
>>>>> 
>>>>> Now I need a processor which would do the following
>>>>> 
>>>>> 1) Form a complete XML file based on START TAG (<packet>)
>>>>> and END TAG (</packet>)
>>>>> 2) Once the XML message is formed convert it to json.
>>>>> 3) Place a json file to local directory using PutFile() processor.
>>>>> 
>>>>> I am not sure if I could able to explain the processor requirement. 
>>>>> Would be really great if you could help me in this.
>>>>> 
>>>>> Thanks and Regards,
>>>>> Parul 
>>>>> 
>>>>> 
>>>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <joe...@gmail.com> 
>>>>>> wrote:
>>>>>> > If you plan to use NiFi for the long term, it might be worth investing 
>>>>>> > in converting your custom Flume components to NiFi processors. We can 
>>>>>> > help you get started if you need any guidance going that route.
>>>>>> 
>>>>>> +1. Running Flume sources/sinks is meant as a transition step. It's
>>>>>> really useful if you have a complex Flume flow and want to migrate
>>>>>> only parts of it over to NiFi at a time. I would port any custom
>>>>>> sources and sinks to NiFi once you knew that it would meet your needs
>>>>>> well. NiFi has a lot of documentation on writing processors and the
>>>>>> concepts map pretty well if you're already familiar with Flume's
>>>>>> execution model.
>>>>>> 
>>>>>> -Joey
>>>>>> 
>>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bbe...@gmail.com> wrote:
>>>>>> >
>>>>>> > Hi Parul,
>>>>>> >
>>>>>> > It is possible to deploy a custom Flume source/sink to NiFi, but due 
>>>>>> > to the way the Flume processors load the classes for the sources and 
>>>>>> > sinks, the jar you deploy to the lib directory also needs to include 
>>>>>> > the other dependencies your source/sink needs (or they each need to 
>>>>>> > individually be in lib/ directly).
>>>>>> >
>>>>>> > So here is a sample project I created that makes a shaded jar:
>>>>>> > https://github.com/bbende/my-flume-source
>>>>>> >
>>>>>> > It will contain the custom source and following dependencies all in 
>>>>>> > one jar:
>>>>>> >
>>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
>>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
>>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
>>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
>>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>>>>>> >   \- com.google.guava:guava:jar:11.0.2:compile
>>>>>> >      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>>>>> >
>>>>>> > I copied that to NiFi lib, restarted, created an ExecuteFlumeSource 
>>>>>> > processor with the following config:
>>>>>> >
>>>>>> > Source Type = org.apache.flume.MySource
>>>>>> > Agent Name = a1
>>>>>> > Source Name = r1
>>>>>> > Flume Configuration = a1.sources = r1
>>>>>> >
>>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log
>>>>>> >
>>>>>> > Keep in mind that this could become risky because any classes found in 
>>>>>> > the lib directory would be accessible to all NARs in NiFi and would be 
>>>>>> > found before classes within a NAR because the parent is checked first 
>>>>>> > during class loading. This example isn't too risky because we are only 
>>>>>> > bringing in flume jars and one guava jar, but for example if another 
>>>>>> > nar uses a different version of guava this is going to cause a problem.
>>>>>> >
>>>>>> > If you plan to use NiFi for the long term, it might be worth investing 
>>>>>> > in converting your custom Flume components to NiFi processors. We can 
>>>>>> > help you get started if you need any guidance going that route.
>>>>>> >
>>>>>> > -Bryan
>>>>>> >
>>>>>> >
>>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal 
>>>>>> > <parulagrawa...@gmail.com> wrote:
>>>>>> >>
>>>>>> >> Hello Bryan,
>>>>>> >>
>>>>>> >> Thank you very much for your response.
>>>>>> >>
>>>>>> >> Is it possible to have customized flume source and sink in Nifi?
>>>>>> >> I have my own customized source and sink? I followed below steps to 
>>>>>> >> add my own customized source but it did not work.
>>>>>> >>
>>>>>> >> 1) Created Maven project and added customized source. (flume.jar was 
>>>>>> >> created after this step)
>>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>>>>>> >> 3) Added flume source processor with the below configuration
>>>>>> >>
>>>>>> >> Property           Value
>>>>>> >> Source Type         com.flume.source.Source
>>>>>> >> Agent Name      a1
>>>>>> >> Source Name         k1.
>>>>>> >>
>>>>>> >> But I am getting the below error in Flume Source Processor.
>>>>>> >> "Failed to run validation due to java.lang.NoClassDefFoundError : 
>>>>>> >> /org/apache/flume/PollableSource."
>>>>>> >>
>>>>>> >> Can you please help me in this regard. Any step/configuration I 
>>>>>> >> missed.
>>>>>> >>
>>>>>> >> Thanks and Regards,
>>>>>> >> Parul
>>>>>> >>
>>>>>> >>
>>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bbe...@gmail.com> wrote:
>>>>>> >>>
>>>>>> >>> Hello,
>>>>>> >>>
>>>>>> >>> The NiFi Flume processors are for running Flume sources and sinks 
>>>>>> >>> with in NiFi. They don't communicate with an external Flume process.
>>>>>> >>>
>>>>>> >>> In your example you would need an ExecuteFlumeSource configured to 
>>>>>> >>> run the netcat source, connected to a ExecuteFlumeSink configured 
>>>>>> >>> with the logger.
>>>>>> >>>
>>>>>> >>> -Bryan
>>>>>> >>>
>>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal 
>>>>>> >>> <parulagrawa...@gmail.com> wrote:
>>>>>> >>>>
>>>>>> >>>> Hi,
>>>>>> >>>>
>>>>>> >>>> I was trying to run Nifi Flume processor with the below mentioned
>>>>>> >>>> details but not could bring it up.
>>>>>> >>>>
>>>>>> >>>> I already started flume with the sample configuration file
>>>>>> >>>> =============================================
>>>>>> >>>> # example.conf: A single-node Flume configuration
>>>>>> >>>>
>>>>>> >>>> # Name the components on this agent
>>>>>> >>>> a1.sources = r1
>>>>>> >>>> a1.sinks = k1
>>>>>> >>>> a1.channels = c1
>>>>>> >>>>
>>>>>> >>>> # Describe/configure the source
>>>>>> >>>> a1.sources.r1.type = netcat
>>>>>> >>>> a1.sources.r1.bind = localhost
>>>>>> >>>> a1.sources.r1.port = 44444
>>>>>> >>>>
>>>>>> >>>> # Describe the sink
>>>>>> >>>> a1.sinks.k1.type = logger
>>>>>> >>>>
>>>>>> >>>> # Use a channel which buffers events in memory
>>>>>> >>>> a1.channels.c1.type = memory
>>>>>> >>>> a1.channels.c1.capacity = 1000
>>>>>> >>>> a1.channels.c1.transactionCapacity = 100
>>>>>> >>>>
>>>>>> >>>> # Bind the source and sink to the channel
>>>>>> >>>> a1.sources.r1.channels = c1
>>>>>> >>>> a1.sinks.k1.channel = c1
>>>>>> >>>> =============================================
>>>>>> >>>>
>>>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf conf
>>>>>> >>>> --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
>>>>>> >>>>
>>>>>> >>>> In the Nifi browser of ExecuteFlumeSink following configuration was 
>>>>>> >>>> done:
>>>>>> >>>> Property           Value
>>>>>> >>>> Sink Type         logger
>>>>>> >>>> Agent Name      a1
>>>>>> >>>> Sink Name         k1.
>>>>>> >>>>
>>>>>> >>>> Event is sent to the flume using:
>>>>>> >>>> $ telnet localhost 44444
>>>>>> >>>> Trying 127.0.0.1...
>>>>>> >>>> Connected to localhost.localdomain (127.0.0.1).
>>>>>> >>>> Escape character is '^]'.
>>>>>> >>>> Hello world! <ENTER>
>>>>>> >>>> OK
>>>>>> >>>>
>>>>>> >>>> But I could not get any data in the nifi flume processor. Request 
>>>>>> >>>> your
>>>>>> >>>> help in this.
>>>>>> >>>> Do i need to change the example.conf file of flume so that Nifi 
>>>>>> >>>> Flume
>>>>>> >>>> Sink should get the data.
>>>>>> >>>>
>>>>>> >>>> Thanks and Regards,
>>>>>> >>>> Parul
>>>>>> >>>
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> --
>>>>>> >>> Sent from Gmail Mobile
>>>>>> >>
>>>>>> >>
>>>>>> >
> 

Reply via email to