> I noticed in the Jira that you pointed me at that there has been some activity on the deserializer - will that do what I'm looking for now ? without writing one ?
Yes Jim. I think patch in that jira will be helpful for you. Please give a try. On 6 January 2017 at 00:45, Jim Langston <[email protected]> wrote: > Hi Laxman - I noticed in the Jira that you pointed me at that there has > been some activity on the deserializer - will that do what I'm looking for > now ? without writing one ? If so, can I get ahold of it to implement ? I'm > on apache-flume-1.6.0-bin > > > > I have tried to use this: agent.sinks.persistence-sink.sink.serializer = > com.adaltas.flume.serialization.HeaderAndBodyTextEventSerializer$Builder > > > and it does put the files out to disk with the header and the body , tried > with spooldir , but nothing on the other end. So, I take it that I'm not > paired > > correctly. > > > > Thanks, > > > Jim > ------------------------------ > *From:* Jim Langston <[email protected]> > *Sent:* Tuesday, January 3, 2017 1:59:15 PM > > *To:* [email protected] > *Subject:* Re: Avro configuration > > > Hi - yes - got the gist from your initial response, went off on the track > to find examples of writing and deploying a serializer before starting to > do so when I realized what file_roll was really doing, that's why I got > confused on your next response. I still haven't found anything that gives > guidance on doing so. > > > What I want to do is relatively simple (or so I thought), that is, a > production stream is providing the initial avro data, the data needs to be > used in a staging and a development environment, the problem is if staging > is down or development is down, it affects the production flumes. I want to > stop that, I was going to do so by saving the data to disk (it is not > important to be real time), and scoop it out and move it on. file_roll > loses the data integrity by wrapping it, and spooldir picks up the arvo > headers of the wrapping , not of the original source. > > > I will look into writing a serializer that file_roll can use that will > place the events to disk without wrapping the event. > > > Jim > ------------------------------ > *From:* Laxman Ch <[email protected]> > *Sent:* Friday, December 30, 2016 11:19:35 PM > *To:* [email protected] > *Subject:* Re: Avro configuration > > Apologies for any confusion Jim. > > You need to make serializer and deserializer in sync. That's the gist of > my first response. > > hdfs.AvroEventSerializer is expecting some headers. We may need to pass > some headers in the event at source. > Can you please post your sample code & config for end-to-end flow in git? > > On 31 December 2016 at 04:51, Jim Langston <[email protected]> > wrote: > >> Ok - after messing with this for quite awhile, I am getting this error >> >> >> FlumeException: Could not find schema for event >> >> >> I added a schema and placed in in hdfs >> >> >> agent.sinks.persistence-sink.type = file_roll >> agent.sinks.persistence-sink.sink.directory = /home/flume/persistence >> agent.sinks.persistence-sink.sink.serializer = >> org.apache.flume.sink.hdfs.AvroEventSerializer$Builder >> agent.sinks.persistence-sink.sink.serializer.schemaURL = >> hdfs://namenode/persistence/persistence.avsc >> # Holders , can be used for tuning, defaults are the values >> agent.sinks.persistence-sink.batchSize = 1000 >> agent.sinks.persistence-sink.sink.rollInterval = 300 >> >> >> Looking at the configurations syntax for file_roll , can it traverse >> serializer.schemaURL ? I see where >> HDFS has serializer and serializer.* , but file_roll does not >> >> Also, looking at the source code, it looks like the only way this error >> message can be generated >> is is the schemaURL is null >> >> private void initialize(Event event) throws IOException { >> Schema schema = null; >> String schemaUrl = event.getHeaders().get(AVRO_SCHEMA_URL_HEADER); >> if (schemaUrl != null) { >> schema = schemaCache.get(schemaUrl); >> if (schema == null) { >> schema = loadFromUrl(schemaUrl); >> schemaCache.put(schemaUrl, schema); >> } >> } >> if (schema == null) { >> String schemaString = event.getHeaders().get(AVRO_SCHEMA_LITERAL_HEADER); >> if (schemaString == null) { >> throw new FlumeException("Could not find schema for event " + event); >> } >> schema = new Schema.Parser().parse(schemaString); >> } >> >> >> >> ------------------------------ >> *From:* Jim Langston <[email protected]> >> *Sent:* Friday, December 30, 2016 8:34:42 AM >> >> *To:* [email protected] >> *Subject:* Re: Avro configuration >> >> >> thanks - I was following your initial response >> >> >> - Use a custom AvroEventSerializer to directly serialize the avro event >> and use it with file_roll sink. >> Reference implementation is available in hdfs sink >> (org.apache.flume.sink.hdfs.AvroEventSerializer) >> You may strip of hdfs dependencies from it achieve what you want. >> >> This seems to indicate that I need to do a custom serializer following >> the constructs of the hdfs sink, what >> I couldn't find were some good examples of creating custom serializers >> and then deploying them. I was tempted to >> post to the developers group, but chose to just keep the context within >> this thread .. >> >> Your new response has me confused, this seems to indicate that all I need >> to do is use the hdfs serializer >> >> >> Jim >> >> ------------------------------ >> *From:* Laxman Ch <[email protected]> >> *Sent:* Thursday, December 29, 2016 3:22:04 PM >> *To:* [email protected] >> *Subject:* Re: Avro configuration >> >> Directly configure this serializer to file_roll sink in your >> configuration. >> >> agent.sinks.persistence-sink.sink.serializer >> = org.apache.flume.sink.hdfs.AvroEventSerializer >> >> Please go through the documentation before you proceed. >> https://flume.apache.org/FlumeUserGuide.html#avro-event-serializer >> >> >> On 29 December 2016 at 23:29, Jim Langston <[email protected]> >> wrote: >> >>> Thanks - I have pulled the flume source, are there any good step-by-step >>> examples of creating a custom Serializer and deploying it for flume to use >>> ? >>> >>> >>> Jim >>> ------------------------------ >>> *From:* Laxman Ch <[email protected]> >>> *Sent:* Wednesday, December 28, 2016 12:05:08 AM >>> *To:* [email protected] >>> *Subject:* Re: Avro configuration >>> >>> Jim, >>> >>> In one-liner, FlumeEventAvroEventSerializer and AvroEventDeserializer >>> are not in sync and they can't be used as a serde pair. >>> >>> Flume's built-in avro serializer FlumeEventAvroEventSerializer which >>> serializes Flume events with shell. It's important to note that, here >>> actual raw event is wrapped inside the flume shell object and this raw >>> object is treated as binary (which can be thrift, avro, or just a byte >>> array, etc). >>> Flume's built-in avro deserializer AvroEventDeserializer which >>> deserializes any generic event serialized and it wraps the deserialized >>> event into another flume shell object. >>> >>> This means as per your configuration, spool directory source ( >>> persistence-dev-source) will get an double wrapped flume event >>> (FlumeEvent -> FlumeEvent -> raw event body) >>> >>> To solve this problem, we need to have serializer and deserializer to be >>> in sync. We can achieve it in either of the following approaches. >>> - Use a custom FluemEventAvroEventDeserializer to extract directly >>> FlumeEvent without double wrapper and use it with spool directory source. >>> >>> Similar attempt has already been made by Sebastian here. >>> https://issues.apache.org/jira/browse/FLUME-2942 >>> >>> I personally recommend to write a FlumeEventAvroEventDeserializer than >>> to modify the existing one. >>> >>> - Use a custom AvroEventSerializer to directly serialize the avro event >>> and use it with file_roll sink. >>> Reference implementation is available in hdfs sink >>> (org.apache.flume.sink.hdfs.AvroEventSerializer) >>> You may strip of hdfs dependencies from it achieve what you want. >>> >>> >>> On 28 December 2016 at 01:17, Jim Langston <[email protected]> >>> wrote: >>> >>>> Hi all, >>>> >>>> >>>> I'm looking for some guidance , I have been trying to get a flow >>>> working that involves the following: >>>> >>>> >>>> Source Avro --> mem channel --> file_roll >>>> >>>> >>>> File Roll config >>>> >>>> >>>> agent.sinks.persistence-sink.type = file_roll >>>> agent.sinks.persistence-sink.sink.directory = /home/flume/persistence >>>> agent.sinks.persistence-sink.sink.serializer = avro_event >>>> agent.sinks.persistence-sink.batchSize = 1000 >>>> agent.sinks.persistence-sink.sink.rollInterval = 300 >>>> >>>> >>>> Once the data is on local disk, I want to flume the data to another >>>> flume server >>>> >>>> Source spooldir --> mem channel -- Avro Sink (to another flume server) >>>> >>>> agent.sources.persistence-dev-source.type = spooldir >>>> agent.sources.persistence-dev-source.spoolDir = /home/flume/ready >>>> agent.sources.persistence-dev-source.deserializer = avro >>>> agent.sources.persistence-dev-source.deserializer.schemaType = LITERAL >>>> agent.sources.persistence-dev-source.batchSize = 1000 >>>> >>>> >>>> >>>> The problem is that file_roll will put the incoming Avro data into a >>>> Avro container before storing the data on the local file system. Then when >>>> the data is picked up by the spooldir source , and sent to the flume >>>> server, it will have the file_roll headers when being read by the >>>> interceptor. >>>> >>>> Is there a recommended way to save the Avro data coming in, that will >>>> maintain its integrity when sending on to another flume server, which is >>>> waiting on Avro data to multiplex and send to its channels. >>>> >>>> I have tried many different variations, with the result of the above >>>> configurations getting the Avro to the other server with the Avro data that >>>> was received, but the problem is that the applications will see the >>>> container headers from the file_roll , and not the headers from the records >>>> from the initial Avro data. >>>> >>>> >>>> Thanks, >>>> >>>> Jim >>>> >>>> schema that gets set by file_roll on its writes to disk: >>>> >>>> { >>>> "type" : "record", >>>> "name" : "Event", >>>> "fields" : [ { >>>> "name" : "headers", >>>> "type" : { >>>> "type" : "map", >>>> "values" : "string" >>>> } >>>> }, { >>>> "name" : "body", >>>> "type" : "bytes" >>>> } ] >>>> } >>>> >>>> >>>> >>>> >>>> >>>> >>> >>> >>> -- >>> Thanks, >>> Laxman >>> >> >> >> >> -- >> Thanks, >> Laxman >> > > > > -- > Thanks, > Laxman > -- Thanks, Laxman
