Hi, I tried with the above json element. But I am getting the below mentioned error:
2015-10-12 23:53:39,209 ERROR [Timer-Driven Process Thread-9] o.a.n.p.standard.ConvertJSONToSQL ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to parse StandardFlowFileRecord[uuid=dfc16db0-c7a6-4e9e-8b4d-8c5b4ec50742,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default, section=1], offset=132621, length=55],offset=0,name=json,size=55] as JSON due to org.apache.nifi.processor.exception.ProcessException: IOException thrown from ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e]: org.codehaus.jackson.JsonParseException: Unexpected character ('I' (code 73)): expected a valid value (number, String, array, object, 'true', 'false' or 'null') Also I have a huge json object attached (new.json). Can you guide me on how do i use ConvertJSONToSQL processor. Should I use any other processor before using ConvertJSONToSQL processor so that this new.json can be converted in to a flat document of key/value pairs, or an array of flat documents. Any help/guidance would be really useful. Thanks and Regards, Parul On Mon, Oct 12, 2015 at 10:36 PM, Bryan Bende <bbe...@gmail.com> wrote: > I think ConvertJSONToSQL expects a flat document of key/value pairs, or an > array of flat documents. So I think your JSON would be: > > [ > {"firstname":"John", "lastname":"Doe"}, > {"firstname":"Anna", "lastname":"Smith"} > ] > > The table name will come from the Table Name property. > > Let us know if this doesn't work. > > -Bryan > > > On Mon, Oct 12, 2015 at 12:19 PM, Parul Agrawal <parulagrawa...@gmail.com> > wrote: > >> Hi, >> >> Thank you very much for all the support. >> I could able to convert XML format to json using custom flume source. >> >> Now I would need ConvertJSONToSQL processor to insert data into SQL. >> I am trying to get hands-on on this processor. Will update you on this. >> Meanwhile if any example you could share to use this processor for a >> sample >> json data, then it would be great. >> >> =============== >> >> 1) I tried using ConvertJSONToSQL processor with the below sample json >> file: >> >> "details":[ >> {"firstname":"John", "lastname":"Doe"}, >> {"firstname":"Anna", "lastname":"Smith"} >> ] >> >> 2) I created table *details *in the postgreSQL >> * select * from details ;* >> * firstname | lastname* >> *-----------+----------* >> *(0 rows)* >> >> 3) ConvertJSONToSQL Processor property details are as below: >> *Property * *Value* >> JDBC Connection PoolInfo DBCPConnectionPool >> Statement TypeInfo INSERT >> Table NameInfo details >> Catalog NameInfo No value set >> Translate Field NamesInfo false >> Unmatched Field BehaviorInfo Ignore Unmatched Fields >> Update KeysInfo No value set >> >> But I am getting the below mentioned error in ConvertJSONToSQL Processor. >> 2015-10-12 05:15:19,584 ERROR [Timer-Driven Process Thread-1] >> o.a.n.p.standard.ConvertJSONToSQL >> ConvertJSONToSQL[id=0e964781-6914-486f-8bb7-214c6a1cd66e] Failed to convert >> StandardFlowFileRecord[uuid=3a58716b-1474-4d75-91c1-e2fc3b9175ba,claim=StandardContentClaim >> [resourceClaim=StandardResourceClaim[id=1444483036971-1, container=default, >> section=1], offset=115045, length=104],offset=0,name=json,size=104] to a >> SQL INSERT statement due to >> org.apache.nifi.processor.exception.ProcessException: None of the fields in >> the JSON map to the columns defined by the details table; routing to >> failure: org.apache.nifi.processor.exception.ProcessException: None of the >> fields in the JSON map to the columns defined by the details table >> >> Thanks and Regards, >> Parul >> >> On Sat, Oct 10, 2015 at 9:45 PM, Joey Echeverria <joe...@gmail.com> >> wrote: >> >>> I've done something like this by wrapping the command in a shell script: >>> >>> http://ingest.tips/2014/12/22/getting-started-with-apache-nifi/ >>> >>> My use case was slightly different, but I'm pretty sure you can adapt >>> the same idea. >>> >>> -Joey >>> >>> On Oct 10, 2015, at 03:52, Parul Agrawal <parulagrawa...@gmail.com> >>> wrote: >>> >>> Hi, >>> >>> I actually need to get the data from pipe. >>> So the actual command I would need is mkfifo /tmp/packet;tshark -i >>> ens160 -T pdml >/tmp/packet. >>> Is it possible to use ExecuteProcessor for multiple commands ? >>> >>> On Sat, Oct 10, 2015 at 1:04 PM, Parul Agrawal <parulagrawa...@gmail.com >>> > wrote: >>> >>>> Hi, >>>> >>>> I added custom flume source and when flume source is sending the data >>>> to flume sink, below mentioned error is thrown at flume sink. >>>> >>>> Administratively Yielded for 1 sec due to processing failure >>>> 2015-10-10 02:30:45,027 WARN [Timer-Driven Process Thread-9] >>>> o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding >>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] due to uncaught >>>> Exception: java.lang.IllegalStateException: close() called when transaction >>>> is OPEN - you must either commit or rollback first >>>> 2015-10-10 02:30:45,028 WARN [Timer-Driven Process Thread-9] >>>> o.a.n.c.t.ContinuallyRunProcessorTask >>>> java.lang.IllegalStateException: close() called when transaction is >>>> OPEN - you must either commit or rollback first >>>> at >>>> com.google.common.base.Preconditions.checkState(Preconditions.java:172) >>>> ~[guava-r05.jar:na] >>>> at >>>> org.apache.flume.channel.BasicTransactionSemantics.close(BasicTransactionSemantics.java:179) >>>> ~[flume-ng-core-1.6.0.jar:1.6.0] >>>> at >>>> org.apache.flume.sink.LoggerSink.process(LoggerSink.java:105) >>>> ~[flume-ng-core-1.6.0.jar:1.6.0] >>>> at >>>> org.apache.nifi.processors.flume.ExecuteFlumeSink.onTrigger(ExecuteFlumeSink.java:139) >>>> ~[na:na] >>>> at >>>> org.apache.nifi.processors.flume.AbstractFlumeProcessor.onTrigger(AbstractFlumeProcessor.java:148) >>>> ~[na:na] >>>> at >>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1077) >>>> ~[nifi-framework-core-0.3.0.jar:0.3.0] >>>> at >>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127) >>>> [nifi-framework-core-0.3.0.jar:0.3.0] >>>> at >>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49) >>>> [nifi-framework-core-0.3.0.jar:0.3.0] >>>> at >>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119) >>>> [nifi-framework-core-0.3.0.jar:0.3.0] >>>> at >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) >>>> [na:1.7.0_85] >>>> at >>>> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) >>>> [na:1.7.0_85] >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) >>>> [na:1.7.0_85] >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>> [na:1.7.0_85] >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>> [na:1.7.0_85] >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>> [na:1.7.0_85] >>>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_85] >>>> 2015-10-10 02:30:46,029 ERROR [Timer-Driven Process Thread-9] >>>> o.a.n.processors.flume.ExecuteFlumeSink >>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] >>>> ExecuteFlumeSink[id=2d08dfe7-4fd1-4a10-9d25-0b007a2c41bf] failed to process >>>> due to org.apache.nifi.processor.exception.FlowFileHandlingException: >>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim >>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default, >>>> section=7], offset=180436, >>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in >>>> this session (StandardProcessSession[id=218318]); rolling back session: >>>> org.apache.nifi.processor.exception.FlowFileHandlingException: >>>> StandardFlowFileRecord[uuid=8832b036-51a4-49cf-9703-fc4ed443ab80,claim=StandardContentClaim >>>> [resourceClaim=StandardResourceClaim[id=1444462207782-7, container=default, >>>> section=7], offset=180436, >>>> length=14078],offset=0,name=8311685679474355,size=14078] is not known in >>>> this session (StandardProcessSession[id=218318]) >>>> >>>> Any idea what could be wrong in this. >>>> >>>> Thanks and Regards, >>>> Parul >>>> >>>> >>>> On Fri, Oct 9, 2015 at 6:32 PM, Bryan Bende <bbe...@gmail.com> wrote: >>>> >>>>> Hi Parul, >>>>> >>>>> I think it would be good to keep the convo going on the users list >>>>> since there are more people who can offer help there, and also helps >>>>> everyone learn new solutions. >>>>> >>>>> The quick answer though is that NiFi has an ExecuteProcess processor >>>>> which could execute "tshark -i eth0 -T pdml". >>>>> >>>>> There is not currently an XmlToJson processor, so this could be a >>>>> place where you need a custom processor. For simple cases you can use an >>>>> EvaluateXPath processor to extract values from the XML, and then a >>>>> ReplaceText processor to build a new json document from those extracted >>>>> values. >>>>> >>>>> -Bryan >>>>> >>>>> >>>>> On Fri, Oct 9, 2015 at 3:39 AM, Parul Agrawal < >>>>> parulagrawa...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Little more to add..... >>>>>> I need to keep reading the flowfile till END_TAG is received. i.e. >>>>>> we may need to concatenate the flowfile data till END_TAG. >>>>>> and then convert it to json and call PutFile() processor. >>>>>> >>>>>> Thanks and Regards, >>>>>> Parul >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Oct 9, 2015 at 10:56 AM, Parul Agrawal < >>>>>> parulagrawa...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Thank you very much again for the guidance provided. >>>>>>> Basically I would need a processor which would convert XML file to >>>>>>> Json. >>>>>>> >>>>>>> Currently I have a flume source which is of type "exec" and the >>>>>>> command used is "tshark -i eth0 -T pdml". >>>>>>> >>>>>>> Here Flume source keeps sending data to flume sink. This flow file >>>>>>> would be of PDML format. >>>>>>> >>>>>>> Now I need a processor which would do the following >>>>>>> >>>>>>> 1) Form a complete XML file based on START TAG (<packet>) >>>>>>> and END TAG (</packet>) >>>>>>> 2) Once the XML message is formed convert it to json. >>>>>>> 3) Place a json file to local directory using PutFile() processor. >>>>>>> >>>>>>> I am not sure if I could able to explain the processor requirement. >>>>>>> Would be really great if you could help me in this. >>>>>>> >>>>>>> Thanks and Regards, >>>>>>> Parul >>>>>>> >>>>>>> >>>>>>> On Thu, Oct 8, 2015 at 10:02 PM, Joey Echeverria <joe...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> > If you plan to use NiFi for the long term, it might be worth >>>>>>>> investing in converting your custom Flume components to NiFi >>>>>>>> processors. We >>>>>>>> can help you get started if you need any guidance going that route. >>>>>>>> >>>>>>>> +1. Running Flume sources/sinks is meant as a transition step. It's >>>>>>>> really useful if you have a complex Flume flow and want to migrate >>>>>>>> only parts of it over to NiFi at a time. I would port any custom >>>>>>>> sources and sinks to NiFi once you knew that it would meet your >>>>>>>> needs >>>>>>>> well. NiFi has a lot of documentation on writing processors and the >>>>>>>> concepts map pretty well if you're already familiar with Flume's >>>>>>>> execution model. >>>>>>>> >>>>>>>> -Joey >>>>>>>> >>>>>>>> On Thu, Oct 8, 2015 at 9:48 AM, Bryan Bende <bbe...@gmail.com> >>>>>>>> wrote: >>>>>>>> > >>>>>>>> > Hi Parul, >>>>>>>> > >>>>>>>> > It is possible to deploy a custom Flume source/sink to NiFi, but >>>>>>>> due to the way the Flume processors load the classes for the sources >>>>>>>> and >>>>>>>> sinks, the jar you deploy to the lib directory also needs to include >>>>>>>> the >>>>>>>> other dependencies your source/sink needs (or they each need to >>>>>>>> individually be in lib/ directly). >>>>>>>> > >>>>>>>> > So here is a sample project I created that makes a shaded jar: >>>>>>>> > https://github.com/bbende/my-flume-source >>>>>>>> > >>>>>>>> > It will contain the custom source and following dependencies all >>>>>>>> in one jar: >>>>>>>> > >>>>>>>> > org.apache.flume:my-flume-source:jar:1.0-SNAPSHOT >>>>>>>> > +- org.apache.flume:flume-ng-sdk:jar:1.6.0:compile >>>>>>>> > +- org.apache.flume:flume-ng-core:jar:1.6.0:compile >>>>>>>> > +- org.apache.flume:flume-ng-configuration:jar:1.6.0:compile >>>>>>>> > +- org.apache.flume:flume-ng-auth:jar:1.6.0:compile >>>>>>>> > \- com.google.guava:guava:jar:11.0.2:compile >>>>>>>> > \- com.google.code.findbugs:jsr305:jar:1.3.9:compile >>>>>>>> > >>>>>>>> > I copied that to NiFi lib, restarted, created an >>>>>>>> ExecuteFlumeSource processor with the following config: >>>>>>>> > >>>>>>>> > Source Type = org.apache.flume.MySource >>>>>>>> > Agent Name = a1 >>>>>>>> > Source Name = r1 >>>>>>>> > Flume Configuration = a1.sources = r1 >>>>>>>> > >>>>>>>> > And I was getting the output in nifi/logs/nifi-bootstrap.log >>>>>>>> > >>>>>>>> > Keep in mind that this could become risky because any classes >>>>>>>> found in the lib directory would be accessible to all NARs in NiFi and >>>>>>>> would be found before classes within a NAR because the parent is >>>>>>>> checked >>>>>>>> first during class loading. This example isn't too risky because we are >>>>>>>> only bringing in flume jars and one guava jar, but for example if >>>>>>>> another >>>>>>>> nar uses a different version of guava this is going to cause a problem. >>>>>>>> > >>>>>>>> > If you plan to use NiFi for the long term, it might be worth >>>>>>>> investing in converting your custom Flume components to NiFi >>>>>>>> processors. We >>>>>>>> can help you get started if you need any guidance going that route. >>>>>>>> > >>>>>>>> > -Bryan >>>>>>>> > >>>>>>>> > >>>>>>>> > On Thu, Oct 8, 2015 at 2:30 AM, Parul Agrawal < >>>>>>>> parulagrawa...@gmail.com> wrote: >>>>>>>> >> >>>>>>>> >> Hello Bryan, >>>>>>>> >> >>>>>>>> >> Thank you very much for your response. >>>>>>>> >> >>>>>>>> >> Is it possible to have customized flume source and sink in Nifi? >>>>>>>> >> I have my own customized source and sink? I followed below steps >>>>>>>> to add my own customized source but it did not work. >>>>>>>> >> >>>>>>>> >> 1) Created Maven project and added customized source. (flume.jar >>>>>>>> was created after this step) >>>>>>>> >> 2) Added flume.jar file to nifi-0.3.0/lib folder. >>>>>>>> >> 3) Added flume source processor with the below configuration >>>>>>>> >> >>>>>>>> >> Property Value >>>>>>>> >> Source Type com.flume.source.Source >>>>>>>> >> Agent Name a1 >>>>>>>> >> Source Name k1. >>>>>>>> >> >>>>>>>> >> But I am getting the below error in Flume Source Processor. >>>>>>>> >> "Failed to run validation due to java.lang.NoClassDefFoundError >>>>>>>> : /org/apache/flume/PollableSource." >>>>>>>> >> >>>>>>>> >> Can you please help me in this regard. Any step/configuration I >>>>>>>> missed. >>>>>>>> >> >>>>>>>> >> Thanks and Regards, >>>>>>>> >> Parul >>>>>>>> >> >>>>>>>> >> >>>>>>>> >> On Wed, Oct 7, 2015 at 6:57 PM, Bryan Bende <bbe...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>> >>>>>>>> >>> Hello, >>>>>>>> >>> >>>>>>>> >>> The NiFi Flume processors are for running Flume sources and >>>>>>>> sinks with in NiFi. They don't communicate with an external Flume >>>>>>>> process. >>>>>>>> >>> >>>>>>>> >>> In your example you would need an ExecuteFlumeSource configured >>>>>>>> to run the netcat source, connected to a ExecuteFlumeSink configured >>>>>>>> with >>>>>>>> the logger. >>>>>>>> >>> >>>>>>>> >>> -Bryan >>>>>>>> >>> >>>>>>>> >>> On Wednesday, October 7, 2015, Parul Agrawal < >>>>>>>> parulagrawa...@gmail.com> wrote: >>>>>>>> >>>> >>>>>>>> >>>> Hi, >>>>>>>> >>>> >>>>>>>> >>>> I was trying to run Nifi Flume processor with the below >>>>>>>> mentioned >>>>>>>> >>>> details but not could bring it up. >>>>>>>> >>>> >>>>>>>> >>>> I already started flume with the sample configuration file >>>>>>>> >>>> ============================================= >>>>>>>> >>>> # example.conf: A single-node Flume configuration >>>>>>>> >>>> >>>>>>>> >>>> # Name the components on this agent >>>>>>>> >>>> a1.sources = r1 >>>>>>>> >>>> a1.sinks = k1 >>>>>>>> >>>> a1.channels = c1 >>>>>>>> >>>> >>>>>>>> >>>> # Describe/configure the source >>>>>>>> >>>> a1.sources.r1.type = netcat >>>>>>>> >>>> a1.sources.r1.bind = localhost >>>>>>>> >>>> a1.sources.r1.port = 44444 >>>>>>>> >>>> >>>>>>>> >>>> # Describe the sink >>>>>>>> >>>> a1.sinks.k1.type = logger >>>>>>>> >>>> >>>>>>>> >>>> # Use a channel which buffers events in memory >>>>>>>> >>>> a1.channels.c1.type = memory >>>>>>>> >>>> a1.channels.c1.capacity = 1000 >>>>>>>> >>>> a1.channels.c1.transactionCapacity = 100 >>>>>>>> >>>> >>>>>>>> >>>> # Bind the source and sink to the channel >>>>>>>> >>>> a1.sources.r1.channels = c1 >>>>>>>> >>>> a1.sinks.k1.channel = c1 >>>>>>>> >>>> ============================================= >>>>>>>> >>>> >>>>>>>> >>>> Command used to start flume : $ bin/flume-ng agent --conf conf >>>>>>>> >>>> --conf-file example.conf --name a1 >>>>>>>> -Dflume.root.logger=INFO,console >>>>>>>> >>>> >>>>>>>> >>>> In the Nifi browser of ExecuteFlumeSink following >>>>>>>> configuration was done: >>>>>>>> >>>> Property Value >>>>>>>> >>>> Sink Type logger >>>>>>>> >>>> Agent Name a1 >>>>>>>> >>>> Sink Name k1. >>>>>>>> >>>> >>>>>>>> >>>> Event is sent to the flume using: >>>>>>>> >>>> $ telnet localhost 44444 >>>>>>>> >>>> Trying 127.0.0.1... >>>>>>>> >>>> Connected to localhost.localdomain (127.0.0.1). >>>>>>>> >>>> Escape character is '^]'. >>>>>>>> >>>> Hello world! <ENTER> >>>>>>>> >>>> OK >>>>>>>> >>>> >>>>>>>> >>>> But I could not get any data in the nifi flume processor. >>>>>>>> Request your >>>>>>>> >>>> help in this. >>>>>>>> >>>> Do i need to change the example.conf file of flume so that >>>>>>>> Nifi Flume >>>>>>>> >>>> Sink should get the data. >>>>>>>> >>>> >>>>>>>> >>>> Thanks and Regards, >>>>>>>> >>>> Parul >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> -- >>>>>>>> >>> Sent from Gmail Mobile >>>>>>>> >> >>>>>>>> >> >>>>>>>> > >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
new.json
Description: application/json