You could implement a custom schema registry that converts the protos to schema on the fly and caches.
On December 18, 2018 at 13:55:47, James Srinivasan ( james.sriniva...@gmail.com) wrote: Yup, my example used made-up fields to keep it simple. In reality I have between 20 and 80 fields per schema, with some nesting, arrays etc. It might be useful if I explained what I'm currently doing and why I'm not using the record approach: I've got c.20 different data streams in protobuf [1] format, which I need to put into GeoMesa [2] and ElasticSearch. The best format for the latter two is JSON. I've written my own processor [3] to convert from protobuf to the canonical protobuf encoding as JSON. Unlike Avro, protobuf data does not contain the schema, so this processor requires the .proto schema. Finally, I've written GeoMesa converters from JSON into what is required by GeoMesa (ElasticSearch just works). These converters are actually automatically generated from the .proto schema. So far, so good. For enrichment, I can pull out various JSON elements, look them up in HBase and (thanks to Andrew/Matt) merge the results back into the outgoing JSON. The record approach would allow me to do all the above, but in a more strongly typed way, and would probably be more performant. However, I'd have to write and maintain a NiFi Record Schema (Avro schema?) for each of the .proto schemas (assuming that is possible) which seemed an overhead for little potential gain. I could instead convert protobuf to Avro at the start, but that seemed non-trivial. I guess the main underlying question is how NiFi Record Schemas are meant to work when the source data already has its own schema definition language (right now this is only Avro?) Hope this makes some sense, I'm certainly not against using Records given more time & effort. James [1] https://developers.google.com/protocol-buffers/ [2] https://geomesa.org [3] Which I hope to contribute back On Mon, 17 Dec 2018 at 18:24, Bryan Bende <bbe...@gmail.com> wrote: > > I know you mentioned staying schema agnostic, but if you went with the > record approach then this sounds like a good fit for the HBase lookup > service. > > Steps 3-5 would be using LookupRecord with an HBaseLookupService where > you lookup by row id, and put the results into the current record. > > I'm not sure if your example used made up fields, but if not, then > you'd just need a schema that had the 5 fields defined. > On Mon, Dec 17, 2018 at 1:01 PM Andrew Grande <apere...@gmail.com> wrote: > > > > James, > > > > The easiest would be to merge json in a custom processor. Not easy as in no work at all, but given your limitations with the NiFi version could be done sooner maybe. > > > > Andrew > > > > On Mon, Dec 17, 2018, 9:53 AM James Srinivasan < james.sriniva...@gmail.com> wrote: > >> > >> Hi all, > >> > >> I'm trying to enrich a data stream using NiFi. So far I have the following: > >> > >> 1) Stream of vehicle data in JSON format containing (id, make, model) > >> 2) This vehicle data goes into HBase, using id as the row key and the > >> json data as the cell value (cf:json) > >> 3) Stream of position data in JSON format, containing (id, lat, lon) > >> 4) I extract the id from each of these items, then use FetchHBaseRow > >> to populate the hbase.row attribute with the json content > >> corresponding to that vehicle > >> 5) I want to merge the NiFI attribute (which is actually JSON) into > >> the rest of the content, so I end up with (id, lat, lon, make, model). > >> This is where I am stuck - using the Jolt processor, I keep getting > >> unable to unmarshal json to an object > >> > >> Caveats > >> > >> 1) I'm on NiFi 1.3 > >> 2) Much as I would like to use the new record functionality, I'm > >> trying to be schema agnostic as much as possible > >> > >> Is this the right approach? Is there an easy way to add the attribute > >> value as a valid JSON object? Maybe ReplaceText capturing the trailing > >> } would work? > >> > >> Thanks in advance, > >> > >> James