Hi,

I added custom flume source and when flume source is sending the data to
flume sink, below mentioned error is thrown at flume sink.

 Administratively Yielded for 1 sec due to processing failure
2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9]
o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught
Exception: java.lang.IllegalStateException: close() called when transaction
is OPEN - you must either commit or rollback first
2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9]
o.a.n.c.t.ContinuallyRunProcessorTask
java.lang.IllegalStateException: close() called when transaction is OPEN -
you must either commit or rollback first
        at
com.google.common.base.Preconditions.checkState(Preconditions.java:172)
~[guava-r05.jar:na]
        at
org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179)
~[flume-ng-core-1.6.0.jar:1.6.0]
        at org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105)
~[flume-ng-core-1.6.0.jar:1.6.0]
        at
org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139)
~[na:na]
        at
org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148)
~[na:na]
        at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077)
~[nifi-framework-core-0.3.0.jar:0.3.0]
        at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
[nifi-framework-core-0.3.0.jar:0.3.0]
        at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
[nifi-framework-core-0.3.0.jar:0.3.0]
        at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
[nifi-framework-core-0.3.0.jar:0.3.0]
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
[na:1.7.0_85]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
[na:1.7.0_85]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
[na:1.7.0_85]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[na:1.7.0_85]
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_85]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_85]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85]
2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9]
o.a.n.processors.flume.ExecuteFlumeSink
ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf]
ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process
due to org.apache.nifi.processor.exception.FlowFileHandlingException:
StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
section=7], offset=180436,
length=14078],offset=0,name=8311685679474355,size=14078] is not known in
this session (StandardProcessSession[id=218318]); rolling back session:
org.apache.nifi.processor.exception.FlowFileHandlingException:
StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default,
section=7], offset=180436,
length=14078],offset=0,name=8311685679474355,size=14078] is not known in
this session (StandardProcessSession[id=218318])

Any idea what could be wrong in this.

Thanks and Regards,
Parul


On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bbe...@gmail.com> wrote:

> Hi Parul,
>
> I think it would be good to keep the convo going on the users list since
> there are more people who can offer help there, and also helps everyone
> learn new solutions.
>
> The quick answer though is that NiFi has an ExecuteProcess processor which
> could execute "tshark -i eth0 -T pdml".
>
> There is not currently an XmlToJson processor, so this could be a place
> where you need a custom processor. For simple cases you can use an
> EvaluateXPath processor to extract values from the XML, and then a
> ReplaceText processor to build a new json document from those extracted
> values.
>
> -Bryan
>
>
> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal <parulagrawa...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Little more to add.....
>>  I need to keep reading the flowfile till END_TAG is received. i.e. we
>> may need to concatenate the flowfile data till END_TAG.
>> and then convert it to json and call PutFile() processor.
>>
>> Thanks and Regards,
>> Parul
>>
>>
>>
>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal <parulagrawa...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Thank you very much again for the guidance provided.
>>> Basically I would need a processor which would convert XML file to Json.
>>>
>>> Currently I have a flume source which is of type "exec" and the command
>>> used is "tshark -i eth0 -T pdml".
>>>
>>> Here Flume source keeps sending data to flume sink. This flow file would
>>> be of PDML format.
>>>
>>> Now I need a processor which would do the following
>>>
>>> 1) Form a complete XML file based on START TAG (<packet>)
>>> and END TAG (</packet>)
>>> 2) Once the XML message is formed convert it to json.
>>> 3) Place a json file to local directory using PutFile() processor.
>>>
>>> I am not sure if I could able to explain the processor requirement.
>>> Would be really great if you could help me in this.
>>>
>>> Thanks and Regards,
>>> Parul
>>>
>>>
>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <joe...@gmail.com>
>>> wrote:
>>>
>>>> > If you plan to use NiFi for the long term, it might be worth
>>>> investing in converting your custom Flume components to NiFi processors. We
>>>> can help you get started if you need any guidance going that route.
>>>>
>>>> +1. Running Flume sources/sinks is meant as a transition step. It's
>>>> really useful if you have a complex Flume flow and want to migrate
>>>> only parts of it over to NiFi at a time. I would port any custom
>>>> sources and sinks to NiFi once you knew that it would meet your needs
>>>> well. NiFi has a lot of documentation on writing processors and the
>>>> concepts map pretty well if you're already familiar with Flume's
>>>> execution model.
>>>>
>>>> -Joey
>>>>
>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bbe...@gmail.com> wrote:
>>>> >
>>>> > Hi Parul,
>>>> >
>>>> > It is possible to deploy a custom Flume source/sink to NiFi, but due
>>>> to the way the Flume processors load the classes for the sources and sinks,
>>>> the jar you deploy to the lib directory also needs to include the other
>>>> dependencies your source/sink needs (or they each need to individually be
>>>> in lib/ directly).
>>>> >
>>>> > So here is a sample project I created that makes a shaded jar:
>>>> > https://github.com/bbende/my-flume-source
>>>> >
>>>> > It will contain the custom source and following dependencies all in
>>>> one jar:
>>>> >
>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT
>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile
>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile
>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile
>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile
>>>> >   \- com.google.guava:guava:jar:11.0.2:compile
>>>> >      \- com.google.code.findbugs:jsr305:jar:1.3.9:compile
>>>> >
>>>> > I copied that to NiFi lib, restarted, created an ExecuteFlumeSource
>>>> processor with the following config:
>>>> >
>>>> > Source Type = org.apache.flume.MySource
>>>> > Agent Name = a1
>>>> > Source Name = r1
>>>> > Flume Configuration = a1.sources = r1
>>>> >
>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log
>>>> >
>>>> > Keep in mind that this could become risky because any classes found
>>>> in the lib directory would be accessible to all NARs in NiFi and would be
>>>> found before classes within a NAR because the parent is checked first
>>>> during class loading. This example isn't too risky because we are only
>>>> bringing in flume jars and one guava jar, but for example if another nar
>>>> uses a different version of guava this is going to cause a problem.
>>>> >
>>>> > If you plan to use NiFi for the long term, it might be worth
>>>> investing in converting your custom Flume components to NiFi processors. We
>>>> can help you get started if you need any guidance going that route.
>>>> >
>>>> > -Bryan
>>>> >
>>>> >
>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal <
>>>> parulagrawa...@gmail.com> wrote:
>>>> >>
>>>> >> Hello Bryan,
>>>> >>
>>>> >> Thank you very much for your response.
>>>> >>
>>>> >> Is it possible to have customized flume source and sink in Nifi?
>>>> >> I have my own customized source and sink? I followed below steps to
>>>> add my own customized source but it did not work.
>>>> >>
>>>> >> 1) Created Maven project and added customized source. (flume.jar was
>>>> created after this step)
>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder.
>>>> >> 3) Added flume source processor with the below configuration
>>>> >>
>>>> >> Property           Value
>>>> >> Source Type         com.flume.source.Source
>>>> >> Agent Name      a1
>>>> >> Source Name         k1.
>>>> >>
>>>> >> But I am getting the below error in Flume Source Processor.
>>>> >> "Failed to run validation due to java.lang.NoClassDefFoundError :
>>>> /org/apache/flume/PollableSource."
>>>> >>
>>>> >> Can you please help me in this regard. Any step/configuration I
>>>> missed.
>>>> >>
>>>> >> Thanks and Regards,
>>>> >> Parul
>>>> >>
>>>> >>
>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bbe...@gmail.com>
>>>> wrote:
>>>> >>>
>>>> >>> Hello,
>>>> >>>
>>>> >>> The NiFi Flume processors are for running Flume sources and sinks
>>>> with in NiFi. They don't communicate with an external Flume process.
>>>> >>>
>>>> >>> In your example you would need an ExecuteFlumeSource configured to
>>>> run the netcat source, connected to a ExecuteFlumeSink configured with the
>>>> logger.
>>>> >>>
>>>> >>> -Bryan
>>>> >>>
>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal <
>>>> parulagrawa...@gmail.com> wrote:
>>>> >>>>
>>>> >>>> Hi,
>>>> >>>>
>>>> >>>> I was trying to run Nifi Flume processor with the below mentioned
>>>> >>>> details but not could bring it up.
>>>> >>>>
>>>> >>>> I already started flume with the sample configuration file
>>>> >>>> =============================================
>>>> >>>> # example.conf: A single-node Flume configuration
>>>> >>>>
>>>> >>>> # Name the components on this agent
>>>> >>>> a1.sources = r1
>>>> >>>> a1.sinks = k1
>>>> >>>> a1.channels = c1
>>>> >>>>
>>>> >>>> # Describe/configure the source
>>>> >>>> a1.sources.r1.type = netcat
>>>> >>>> a1.sources.r1.bind = localhost
>>>> >>>> a1.sources.r1.port = 44444
>>>> >>>>
>>>> >>>> # Describe the sink
>>>> >>>> a1.sinks.k1.type = logger
>>>> >>>>
>>>> >>>> # Use a channel which buffers events in memory
>>>> >>>> a1.channels.c1.type = memory
>>>> >>>> a1.channels.c1.capacity = 1000
>>>> >>>> a1.channels.c1.transactionCapacity = 100
>>>> >>>>
>>>> >>>> # Bind the source and sink to the channel
>>>> >>>> a1.sources.r1.channels = c1
>>>> >>>> a1.sinks.k1.channel = c1
>>>> >>>> =============================================
>>>> >>>>
>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf conf
>>>> >>>> --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console
>>>> >>>>
>>>> >>>> In the Nifi browser of ExecuteFlumeSink following configuration
>>>> was done:
>>>> >>>> Property           Value
>>>> >>>> Sink Type         logger
>>>> >>>> Agent Name      a1
>>>> >>>> Sink Name         k1.
>>>> >>>>
>>>> >>>> Event is sent to the flume using:
>>>> >>>> $ telnet localhost 44444
>>>> >>>> Trying 127.0.0.1...
>>>> >>>> Connected to localhost.localdomain (127.0.0.1).
>>>> >>>> Escape character is '^]'.
>>>> >>>> Hello world! <ENTER>
>>>> >>>> OK
>>>> >>>>
>>>> >>>> But I could not get any data in the nifi flume processor. Request
>>>> your
>>>> >>>> help in this.
>>>> >>>> Do i need to change the example.conf file of flume so that Nifi
>>>> Flume
>>>> >>>> Sink should get the data.
>>>> >>>>
>>>> >>>> Thanks and Regards,
>>>> >>>> Parul
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>> Sent from Gmail Mobile
>>>> >>
>>>> >>
>>>> >
>>>>
>>>
>>>
>>
>

Reply via email to