Mark, As far as I know, no one is working the RouteCSV processor yet. There is a JIRA for it though: https://issues.apache.org/jira/browse/NIFI-1161
-Bryan On Mon, Nov 23, 2015 at 4:58 PM, Mark Petronic <[email protected]> wrote: > Mark Payne stated, "We should have a RouteCSV processor as well." > > I am wondering now as I move further along in my ETL design and > implementation, is anyone actually working on a RouteCSV processor? I want > to avoid duplicating work if so, otherwise, I need to implement such a > behavior. What I really need to do is route on a grouping key made up of > some combination of column headers (or sub parts of a column header, like > the yyyymmdd part of some yyyymmddhhmmss timestamp in some column) with an > variable that is NOT part of the header fields that was received as an > attribute on the flow file from an upstream processor. Clearly, being able > to access the fields by column index vs. a regex is cleaner - at for the > CSV file use case. > > So, just curious on plans for RouteCSV. > > Thanks! > > On Fri, Nov 13, 2015 at 8:29 PM, Mark Payne <[email protected]> wrote: > >> Mark, >> >> That's great! Unfortunately, though, no mind reading :) This is an >> extremely powerful pattern, though, that >> fits perfectly within NiFi's wheelhouse. In a more generic sense, what >> we're really doing can be thought of >> as registering a "streaming query." We can register a query for any text >> that matches a regular expression, >> or any text that begins with a certain string, etc. Then, data just >> streams through the Processor and goes >> wherever it is configured to go. >> >> Similarly, we can use EvaluateJsonPath in conjunction with >> RouteOnAttribute to do the same thing with JSON >> data. Or EvaluateXPath & RouteOnAttribute to create a streaming query >> against XML. I believe something is >> in the works for Avro, as well. >> >> This ability to take in a flood of data, configure a few "queries" and >> have that flood of data separated out into >> well-organized streams is probably my favorite aspect of NiFi. That being >> said, we are always learning and >> adapting and still reasonably new to the Open Source world. So feel free >> to critique and provide any feedback >> you have. Since this processor hasn't been officially released yet, this >> is the easiest time to affect large changes :) >> >> On Nov 13, 2015, at 6:46 PM, Mark Petronic <[email protected]> >> wrote: >> >> Got to say, Mark... Loving the RouteText processor!!! It definitely >> solved multiple tasks including, as a side effect, stripping the CSV header >> (because it does not match the regex) which I was doing before with a sed >> command. :) Thank you (and other contribs) for reading my mind and building >> this processor. Is that a feature of Nifi? Processors that show up before >> you ask for them? LOL. I was re-reading the developer guide and happened to >> see... >> >> "For example, imagine that we want to have a RouteCSV Processor such that >> it is configured with multiple Regular Expressions." >> >> LOL. Guess someone had the baby in mind for awhile, too. :) >> >> On Fri, Nov 13, 2015 at 12:57 PM, Mark Petronic <[email protected]> >> wrote: >> >>> Excellent! I will build and play ASAP. :) >>> >>> On Fri, Nov 13, 2015 at 12:25 PM, Mark Payne <[email protected]> >>> wrote: >>> >>>> Mark, >>>> >>>> Ok thanks for the more detailed explanation. I think that makes >>>> RouteText a much more appealing solution. It is available on master now. >>>> >>>> Thanks >>>> -Mark >>>> >>>> Sent from my iPhone >>>> >>>> On Nov 13, 2015, at 12:12 PM, Mark Petronic <[email protected]> >>>> wrote: >>>> >>>> Thank you, Mark, for the quick reply. My comments on your comments... >>>> >>>> "That's a great question! 200 million per day equates to about 2K - 3K >>>> per second." >>>> >>>> Unfortunately, the rate will be much more extreme. Those records all >>>> show up over the course of about 4 hours. So, every time a new zip file >>>> appears on my NFS share, I grab it and process it. About 160 files will >>>> appear over those 4 hours. For example, the average sized zip file would >>>> likely contain about 180,000 or 1,600,000 records to process in that one >>>> scheduled Nifi run. And, it is very likely that, for a given run, say >>>> scheduled to run every 30 minutes, there could be multiple files to >>>> process. I did not mention it before but there are really two types of very >>>> large CSV files I have to process here, one is 18M records and the other >>>> 200M records per day. So, traffic is very bursty. >>>> >>>> Does that change anything regarding the intended use case for the new >>>> RouteText >>>> >>>> "We should have a RouteCSV processor as well." >>>> >>>> That would be very nice and definitely more performant without the need >>>> for regex matching. However, I definitely would benefit even from >>>> RouteText, in the interim. For my use case, the regex will be pretty simple >>>> as the timestamp is close to the front of the record, but I see where you >>>> are going on the potential complexity with groups and widely spread out >>>> fields of interest. >>>> >>>> Is RouteText available on any branch where I could build and play >>>> around with it before 0.4.0? >>>> >>>> Thanks, >>>> Mark >>>> >>>> On Fri, Nov 13, 2015 at 11:14 AM, Mark Payne <[email protected]> >>>> wrote: >>>> >>>>> Mark, >>>>> >>>>> That's a great question! 200 million per day equates to about 2K - 3K >>>>> per second. So that is quite reasonable. >>>>> You are very correct, though, that splitting that CSV into tons of >>>>> one-line FlowFiles does indeed have a cost. >>>>> Specifically, the big cost is the Provenance data that is generated at >>>>> that rate. But again 2K - 3K per second >>>>> going through a handful of Processors is a very reasonable workload. >>>>> >>>>> I will caution you, though, that there is a ticket [1] where people >>>>> will sometimes run into Out Of Memory Errors >>>>> if they try to split a huge CSV into individual FlowFiles because it >>>>> holes all of those FlowFile objects (not the >>>>> data itself but the attributes) in memory until the session is >>>>> committed. The workaround for this (until that ticket >>>>> is completed) is to use a SplitText to split into 10,000 lines or so >>>>> per FlowFile and then another SplitText to >>>>> split each of those smaller ones into 1-line FlowFiles. >>>>> >>>>> Also of note, in 0.4.0, which is expected to be released in around a >>>>> week or so, there is a new RouteText >>>>> Processor. This, I think, will make your life far easier. Rather than >>>>> using SplitText, Extract Text, and MergeContent >>>>> in order to group the text, RouteText will allow you to supply a >>>>> Grouping Regex. So that regex can just pull out the >>>>> device id, year, month, and day, from each line and group together >>>>> lines of text that have the same values into >>>>> a single FlowFile. For instance, if your CSV looked like: >>>>> >>>>> # device_id, device_manufacturer, value, year, month, day, hour >>>>> 1234, Famous Manufacturer, 83, 2015, 11, 13, 12 >>>>> >>>>> You could define a grouping regex as: >>>>> (\d+), .*?, .*?, (\d+), (\d+), (\d+), .* >>>>> >>>>> It looks complex but it's just breaking apart the CSV into individual >>>>> fields and grouping on device_id, year, month, day. >>>>> This will also create a RouteText.Group attribute with the value >>>>> "1234, 2015, 11, 13" >>>>> >>>>> This processor provides two benefits: it combines all of the grouping >>>>> into a single Processor, and it cuts down on the >>>>> millions of FlowFiles that are generated and then merged back together. >>>>> >>>>> As I write this, though, I am realizing that the regex above is quite >>>>> a pain. We should have a RouteCSV processor as well. >>>>> Though it won't provide any features that RouteText can't provide, it >>>>> will make configuration far easier. I created a ticket >>>>> for this here [2]. I'm not sure that it will make it into the 0.4.0 >>>>> release, though. >>>>> >>>>> I hope this helps! >>>>> >>>>> Thanks >>>>> -Mark >>>>> >>>>> [1] https://issues.apache.org/jira/browse/NIFI-1008 >>>>> [2] https://issues.apache.org/jira/browse/NIFI-1161 >>>>> >>>>> >>>>> On Nov 13, 2015, at 10:44 AM, Mark Petronic <[email protected]> >>>>> wrote: >>>>> >>>>> I have a concept question. Say I have 10 GB of CSV files/day >>>>> containing records where 99% of them are from the last couple days but >>>>> there are stragglers that are late reported that can date back many >>>>> months. >>>>> Devices that are powered off at times don't report but eventually do when >>>>> powered on and report old, captured data - store and forward kind of >>>>> thing. >>>>> I want to capture all the data for historic reasons. There are about 200 >>>>> million records per day to process. I want to put this data in Hive tables >>>>> that are partitioned by year, month, and day and use ORC columnar storage. >>>>> These tables are external Hive tables and point to the directories where I >>>>> want to drop these files on HDFS, manually add new partitions, as needed, >>>>> and immediately be able to query using HQL. >>>>> >>>>> Nifi Concept: >>>>> >>>>> 0. Use GetText to get a CSV file >>>>> 1. Use UpdateAttribute to parse the incoming CSV file name to obtain a >>>>> device_id and set that as an attribute on the flow file >>>>> 2. Use SplitText to split each row into a flow file. >>>>> 3. Use ExtractText to identify the field that is the record timestamp >>>>> and create a year, month, and day attribute on the flow file from that >>>>> timestamp. >>>>> 4. Use MergeContent with a grouping key made up of >>>>> (device_id,year,month,day) >>>>> 5. Convert each file to ORC (many Parquet). This stage will likely >>>>> require me building a custom processor because the conversion is not going >>>>> to be a simple A-to-B. I want to do some validation on fields, type >>>>> conversion, and other custom stuff against some source-of-truth schema >>>>> stored in a web service with REST API. >>>>> 6. Use PutHDFS to store these ORC files in directories like >>>>> .../device_id/year=2015/month=11/day=9 by using the attributes already >>>>> present from the upstream processors to build up the path, where device_id >>>>> is the Hive table name and the year, month, day are the partition key >>>>> name=value per Hive format. The file names will just be some unique ID, >>>>> they don't really matter >>>>> 7. Use ExecuteProcessStream to execute a Hive script that will "alter >>>>> table add partitions...." for any partitions that were newly created on >>>>> this schedule run >>>>> >>>>> Is this insane or is it what Nifi was designed to do? I could >>>>> definitely see using a Spark job to do the group by >>>>> (device_id,year,month,day) stage. 200M flow files from the SplitText is >>>>> the >>>>> one that has me wondering if I am nuts thinking of doing that? There must >>>>> be overhead on flow files and deprecating them to one line each seems to >>>>> me >>>>> as a worst case scenario. But it all depends on the core design and >>>>> whether >>>>> Nifi is optimized to handle such a use case. >>>>> >>>>> Thanks, >>>>> Mark >>>>> >>>>> >>>>> >>>> >>> >> >> >
