Thanks Wolfgang! That is very nice.
On Thu, Jan 2, 2014 at 6:49 PM, Wolfgang Hoschek <[email protected]>wrote: > FWIW, here is an example for how this could be handled in a > MorphlineInterceptor: > > morphlines : [ > { > id : morphline1 > importCommands : ["org.kitesdk.**"] > > commands : [ > { > tryRules { > catchExceptions: true > rules : [ > # first rule > { > commands : [ > # save initial state > { setValues { _tmp : "@{_attachment_body}" } } > > # if JSON parsing succeeds replace _attachment_body with > JSON jackson object > { readJson {} } > > # if we reach here the JSON parsing has succeeded > # restore state prior to readJson command > { setValues { _attachment_body : "@{_tmp}" } } > { setValues { _tmp : [] } } > { setValues { _attachment_mimetype : [] } } > ] > } > > # second rule is executed if the previous rule failed or threw > an exception > { > commands : [ > { logDebug { format : "Marking event as malformed for > downstream sink: {}", args: ["@{}"] } } > { addValues { malformed : true } } > ] > } > > ] > } > } > ] > } > ] > > Also see > http://kitesdk.org/docs/current/kite-morphlines/morphlinesReferenceGuide.html#tryRules > > Wolfgang. > > On Jan 3, 2014, at 2:03 AM, ed wrote: > > > Thank you Brock, Devin and Jimmy for the great information. Dumping > null values in the the EventSerializer write method looks really easy to do > but I think using the custom interceptor to validate then tag the event for > proper good/bad routing sounds like a great idea and seems to fit into the > Flume way of doing things better. > > > > Thank you again! > > > > ~Ed > > > > > > On Fri, Jan 3, 2014 at 2:40 AM, Devin Suiter RDX <[email protected]> > wrote: > > Yes, the regex interceptors and selectors can be very powerful - > experimenting with them was really exciting. > > > > Brock, thanks for validating the ML idea - as with most things, the > simplest solution is probably the way to go, and in this use case, the > morphlines might be overkill. > > > > Devin Suiter > > Jr. Data Solutions Software Engineer > > > > 100 Sandusky Street | 2nd Floor | Pittsburgh, PA 15212 > > Google Voice: 412-256-8556 | www.rdx.com > > > > > > On Thu, Jan 2, 2014 at 12:27 PM, Brock Noland <[email protected]> > wrote: > > Jimmy, great to hear that method is working for you! > > > > Devin, regarding the morphlines question. Since ML can have arbitrary > java plugins it *can* do just about anything. I generally think of ML as > the T in ETL. Doing the validation in ML might make sense. In general > though I think adding the custom header field as probably the best option > for dealing with bad data. > > > > Once again, thank you everyone for using our software! > > > > > > On Thu, Jan 2, 2014 at 10:10 AM, Jimmy <[email protected]> wrote: > > We are doing similar thing what Brock mentioned - simple interceptor for > JSON validation with updating custom field in the header, then flume HDFS > sink pushes the data to good/bad target directory based on this custom > field.... then watch for bad directory in separate process. > > > > You could add notification to the flume flow, we wanted to keep it very > simple. > > > > > > > > > > ---------- Forwarded message ---------- > > From: Devin Suiter RDX <[email protected]> > > Date: Thu, Jan 2, 2014 at 7:40 AM > > Subject: Re: Handling malformed data when using custom > AvroEventSerializer and HDFS Sink > > To: [email protected] > > > > > > Just throwing this out there, since I haven't had time to dig into the > API with a big fork, but, can morphlines offer any assistance here? > > > > Some kind of an interceptor that would parse for malformed data, package > the offending data and send it somewhere (email it, log it), and then > project a valid "there was something wrong here" piece of data into the > field then allow your channel to carry on? Or skip the projection piece and > just move along? I was just thinking that the projection of known data into > a field that previously had malformed data would allow you to easily locate > those records later with the projected data, but keep your data shape > consistent. > > > > Kind of looking to Brock as a sounding board as to the appropriateness > of this as a potential solution since morphlines takes some time to really > understand well... > > > > Devin Suiter > > Jr. Data Solutions Software Engineer > > > > 100 Sandusky Street | 2nd Floor | Pittsburgh, PA 15212 > > Google Voice: 412-256-8556 | www.rdx.com > > > > > > On Thu, Jan 2, 2014 at 10:25 AM, Brock Noland <[email protected]> > wrote: > > > > On Tue, Dec 31, 2013 at 8:34 PM, ed <[email protected]> wrote: > > Hello, > > > > We are using Flume v1.4 to load JSON formatted log data into HDFS as > Avro. Our flume setup looks like this: > > > > NXLog ==> (FlumeHTTPSource -> HDFSSink w/ custom EventSerializer) > > > > Right now our custom EventSerializer (which extends > AbstractAvroEventSerializer) takes the JSON input from the HTTPSource and > converts it into an avro record of the appropriate type for the incoming > log file. This is working great and we use the serializer to add some > additional "synthetic" fields to the avro record that don't exist in the > original JSON log data. > > > > My question concerns how to handle malformed JSON data (or really any > error inside of the custom EventSerializer). It's very likely that as we > parse the JSON there will be records where something is malformed (either > the JSON itself, or a field is of the wrong type etc.). > > > > For example, a "port" field which should always be an Integer might for > some reason have some ASCII text in it. I'd like to catch these errors in > the EventSerializer and then write out the bad JSON to a log file somewhere > that we can monitor. > > > > Yeah it would be nice to have a better story about this in Flume. > > > > > > What is the best way to do this? > > > > Typically people will either log it to a file or send it through another > "flow" to a different HDFS sink. > > > > > > Right now, all the logic for catching bad JSON would be inside of the > "convert" function of the EventSerializer. Should the convert function > itself throw an exception that will be gracefully handled upstream > > > > The exception will be logged but that is it.. > > > > or do I just return a "null" value if there was an error? Would it be > appropriate to log errors directly to a database from inside the > EventSerializer convert method or would this be too slow? > > > > That might be too slow to do directly. If I did that I'd have a separate > thread doing that and then an in-memory queue between the serializer and > thread. > > > > What are the best practices for this type of error handling? > > > > If looks to me like we'd need to change AbstractAvroEventSerilizer to > filter out nulls: > > > > > https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/serialization/AbstractAvroEventSerializer.java#L106 > > > > which we could easily do. Since you don't want to wait for that you > could override the write method to do this. > > > > > > Thank you for any assistance! > > > > Best Regards, > > > > Ed > > > > > > > > -- > > Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org > > > > > > > > > > -- > > Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org > > > > > > -- Apache MRUnit - Unit testing MapReduce - http://mrunit.apache.org
