Hi, I added custom flume source and when flume source is sending the data to flume sink, below mentioned error is thrown at flume sink.
Administratively Yielded for 1 sec due to processing failure 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught Exception: java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9] o.a.n.c.t.ContinuallyRunProcessorTask java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first at com.google.common.base.Preconditions.checkState(Preconditions.java:172) ~[guava-r05.jar:na] at org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179) ~[flume-ng-core-1.6.0.jar:1.6.0] at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105) ~[flume-ng-core-1.6.0.jar:1.6.0] at org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139) ~[na:na] at org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148) ~[na:na] at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077) ~[nifi-framework-core-0.3.0.jar:0.3.0] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127) [nifi-framework-core-0.3.0.jar:0.3.0] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49) [nifi-framework-core-0.3.0.jar:0.3.0] at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119) [nifi-framework-core-0.3.0.jar:0.3.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_85] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) [na:1.7.0_85] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) [na:1.7.0_85] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [na:1.7.0_85] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_85] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_85] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85] 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9] o.a.n.processors.flume.ExecuteFlumeSink ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default, section=7], offset=180436, length=14078],offset=0,name=8311685679474355,size=14078] is not known in this session (StandardProcessSession[id=218318]); rolling back session: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default, section=7], offset=180436, length=14078],offset=0,name=8311685679474355,size=14078] is not known in this session (StandardProcessSession[id=218318]) Any idea what could be wrong in this. Thanks and Regards, Parul On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bbe...@gmail.com> wrote: > Hi Parul, > > I think it would be good to keep the convo going on the users list since > there are more people who can offer help there, and also helps everyone > learn new solutions. > > The quick answer though is that NiFi has an ExecuteProcess processor which > could execute "tshark -i eth0 -T pdml". > > There is not currently an XmlToJson processor, so this could be a place > where you need a custom processor. For simple cases you can use an > EvaluateXPath processor to extract values from the XML, and then a > ReplaceText processor to build a new json document from those extracted > values. > > -Bryan > > > On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <parulagrawa...@gmail.com> > wrote: > >> Hi, >> >> Little more to add..... >> I need to keep reading the flowfile till END_TAG is received. i.e. we >> may need to concatenate the flowfile data till END_TAG. >> and then convert it to json and call PutFile() processor. >> >> Thanks and Regards, >> Parul >> >> >> >> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <parulagrawa...@gmail.com> >> wrote: >> >>> Hi, >>> >>> Thank you very much again for the guidance provided. >>> Basically I would need a processor which would convert XML file to Json. >>> >>> Currently I have a flume source which is of type "exec" and the command >>> used is "tshark -i eth0 -T pdml". >>> >>> Here Flume source keeps sending data to flume sink. This flow file would >>> be of PDML format. >>> >>> Now I need a processor which would do the following >>> >>> 1) Form a complete XML file based on START TAG (<packet>) >>> and END TAG (</packet>) >>> 2) Once the XML message is formed convert it to json. >>> 3) Place a json file to local directory using PutFile() processor. >>> >>> I am not sure if I could able to explain the processor requirement. >>> Would be really great if you could help me in this. >>> >>> Thanks and Regards, >>> Parul >>> >>> >>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <joe...@gmail.com> >>> wrote: >>> >>>> > If you plan to use NiFi for the long term, it might be worth >>>> investing in converting your custom Flume components to NiFi processors. We >>>> can help you get started if you need any guidance going that route. >>>> >>>> +1. Running Flume sources/sinks is meant as a transition step. It's >>>> really useful if you have a complex Flume flow and want to migrate >>>> only parts of it over to NiFi at a time. I would port any custom >>>> sources and sinks to NiFi once you knew that it would meet your needs >>>> well. NiFi has a lot of documentation on writing processors and the >>>> concepts map pretty well if you're already familiar with Flume's >>>> execution model. >>>> >>>> -Joey >>>> >>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bbe...@gmail.com> wrote: >>>> > >>>> > Hi Parul, >>>> > >>>> > It is possible to deploy a custom Flume source/sink to NiFi, but due >>>> to the way the Flume processors load the classes for the sources and sinks, >>>> the jar you deploy to the lib directory also needs to include the other >>>> dependencies your source/sink needs (or they each need to individually be >>>> in lib/ directly). >>>> > >>>> > So here is a sample project I created that makes a shaded jar: >>>> > https://github.com/bbende/my-flume-source >>>> > >>>> > It will contain the custom source and following dependencies all in >>>> one jar: >>>> > >>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT >>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile >>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile >>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile >>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile >>>> > \- com.google.guava:guava:jar:11.0.2:compile >>>> > \- com.google.code.findbugs:jsr305:jar:1.3.9:compile >>>> > >>>> > I copied that to NiFi lib, restarted, created an ExecuteFlumeSource >>>> processor with the following config: >>>> > >>>> > Source Type = org.apache.flume.MySource >>>> > Agent Name = a1 >>>> > Source Name = r1 >>>> > Flume Configuration = a1.sources = r1 >>>> > >>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log >>>> > >>>> > Keep in mind that this could become risky because any classes found >>>> in the lib directory would be accessible to all NARs in NiFi and would be >>>> found before classes within a NAR because the parent is checked first >>>> during class loading. This example isn't too risky because we are only >>>> bringing in flume jars and one guava jar, but for example if another nar >>>> uses a different version of guava this is going to cause a problem. >>>> > >>>> > If you plan to use NiFi for the long term, it might be worth >>>> investing in converting your custom Flume components to NiFi processors. We >>>> can help you get started if you need any guidance going that route. >>>> > >>>> > -Bryan >>>> > >>>> > >>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal < >>>> parulagrawa...@gmail.com> wrote: >>>> >> >>>> >> Hello Bryan, >>>> >> >>>> >> Thank you very much for your response. >>>> >> >>>> >> Is it possible to have customized flume source and sink in Nifi? >>>> >> I have my own customized source and sink? I followed below steps to >>>> add my own customized source but it did not work. >>>> >> >>>> >> 1) Created Maven project and added customized source. (flume.jar was >>>> created after this step) >>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder. >>>> >> 3) Added flume source processor with the below configuration >>>> >> >>>> >> Property Value >>>> >> Source Type com.flume.source.Source >>>> >> Agent Name a1 >>>> >> Source Name k1. >>>> >> >>>> >> But I am getting the below error in Flume Source Processor. >>>> >> "Failed to run validation due to java.lang.NoClassDefFoundError : >>>> /org/apache/flume/PollableSource." >>>> >> >>>> >> Can you please help me in this regard. Any step/configuration I >>>> missed. >>>> >> >>>> >> Thanks and Regards, >>>> >> Parul >>>> >> >>>> >> >>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bbe...@gmail.com> >>>> wrote: >>>> >>> >>>> >>> Hello, >>>> >>> >>>> >>> The NiFi Flume processors are for running Flume sources and sinks >>>> with in NiFi. They don't communicate with an external Flume process. >>>> >>> >>>> >>> In your example you would need an ExecuteFlumeSource configured to >>>> run the netcat source, connected to a ExecuteFlumeSink configured with the >>>> logger. >>>> >>> >>>> >>> -Bryan >>>> >>> >>>> >>> On Wednesday, October 7, 2015, Parul Agrawal < >>>> parulagrawa...@gmail.com> wrote: >>>> >>>> >>>> >>>> Hi, >>>> >>>> >>>> >>>> I was trying to run Nifi Flume processor with the below mentioned >>>> >>>> details but not could bring it up. >>>> >>>> >>>> >>>> I already started flume with the sample configuration file >>>> >>>> ============================================= >>>> >>>> # example.conf: A single-node Flume configuration >>>> >>>> >>>> >>>> # Name the components on this agent >>>> >>>> a1.sources = r1 >>>> >>>> a1.sinks = k1 >>>> >>>> a1.channels = c1 >>>> >>>> >>>> >>>> # Describe/configure the source >>>> >>>> a1.sources.r1.type = netcat >>>> >>>> a1.sources.r1.bind = localhost >>>> >>>> a1.sources.r1.port = 44444 >>>> >>>> >>>> >>>> # Describe the sink >>>> >>>> a1.sinks.k1.type = logger >>>> >>>> >>>> >>>> # Use a channel which buffers events in memory >>>> >>>> a1.channels.c1.type = memory >>>> >>>> a1.channels.c1.capacity = 1000 >>>> >>>> a1.channels.c1.transactionCapacity = 100 >>>> >>>> >>>> >>>> # Bind the source and sink to the channel >>>> >>>> a1.sources.r1.channels = c1 >>>> >>>> a1.sinks.k1.channel = c1 >>>> >>>> ============================================= >>>> >>>> >>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf conf >>>> >>>> --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console >>>> >>>> >>>> >>>> In the Nifi browser of ExecuteFlumeSink following configuration >>>> was done: >>>> >>>> Property Value >>>> >>>> Sink Type logger >>>> >>>> Agent Name a1 >>>> >>>> Sink Name k1. >>>> >>>> >>>> >>>> Event is sent to the flume using: >>>> >>>> $ telnet localhost 44444 >>>> >>>> Trying 127.0.0.1... >>>> >>>> Connected to localhost.localdomain (127.0.0.1). >>>> >>>> Escape character is '^]'. >>>> >>>> Hello world! <ENTER> >>>> >>>> OK >>>> >>>> >>>> >>>> But I could not get any data in the nifi flume processor. Request >>>> your >>>> >>>> help in this. >>>> >>>> Do i need to change the example.conf file of flume so that Nifi >>>> Flume >>>> >>>> Sink should get the data. >>>> >>>> >>>> >>>> Thanks and Regards, >>>> >>>> Parul >>>> >>> >>>> >>> >>>> >>> >>>> >>> -- >>>> >>> Sent from Gmail Mobile >>>> >> >>>> >> >>>> > >>>> >>> >>> >> >