Tracked that error down .. now getting this one:

08 Jan 2017 05:00:21,789 ERROR [pool-8-thread-1] 
(org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:256)  
- FATAL: Spool Directory source persistence-dev-source: { spoolDir: 
/home/flume/ready }: Uncaught exception in SpoolDirectorySource thread. Restart 
or reconfigure Flume to continue processing.
org.apache.flume.FlumeException: Unable to instantiate Builder from 
org.apache.flume.serialization.FlumeEventAvroEventDeserializer: does not appear 
to implement org.apache.flume.serialization.EventDeserializer$Builder
        at 
org.apache.flume.serialization.EventDeserializerFactory.getInstance(EventDeserializerFactory.java:65)
        at 
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.openFile(ReliableSpoolingFileEventReader.java:520)
        at 
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.getNextFile(ReliableSpoolingFileEventReader.java:484)
        at 
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:243)
        at 
org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:228)




Thoughts ?


Jim

________________________________
From: Jim Langston <[email protected]>
Sent: Saturday, January 7, 2017 11:14:02 PM
To: [email protected]
Subject: Re: Avro configuration


Hi Laxman - gave it a shot, but I'm getting this exception:


org.apache.flume.FlumeException: Unable to instantiate Builder from 
org.apache.flume.serialization.FlumeEventAvroEventDeserializer: does not appear 
to implement org.apache.flume.serialization.EventDeserializer$Builder
        at 
org.apache.flume.serialization.EventDeserializerFactory.getInstance(EventDeserializerFactory.java:65)


Jim

________________________________
From: Laxman Ch <[email protected]>
Sent: Thursday, January 5, 2017 4:49:31 PM
To: [email protected]
Subject: Re: Avro configuration

> I noticed in the Jira that you pointed me at that there has been some 
> activity on the deserializer - will that do what I'm looking for now ? 
> without writing one ?

Yes Jim. I think patch in that jira will be helpful for you. Please give a try.

On 6 January 2017 at 00:45, Jim Langston 
<[email protected]<mailto:[email protected]>> wrote:

Hi Laxman - I noticed in the Jira that you pointed me at that there has been 
some activity on the deserializer - will that do what I'm looking for now ? 
without writing one ? If so, can I get ahold of it to implement ? I'm on 
apache-flume-1.6.0-bin



I have tried to use this: agent.sinks.persistence-sink.sink.serializer = 
com.adaltas.flume.serialization.HeaderAndBodyTextEventSerializer$Builder


and it does put the files out to disk with the header and the body , tried with 
spooldir , but nothing on the other end. So, I take it that I'm not paired

correctly.



Thanks,


Jim

________________________________
From: Jim Langston <[email protected]<mailto:[email protected]>>
Sent: Tuesday, January 3, 2017 1:59:15 PM

To: [email protected]<mailto:[email protected]>
Subject: Re: Avro configuration


Hi - yes - got the gist from your initial response, went off on the track to 
find examples of writing and deploying a serializer before starting to do so 
when I realized what file_roll was really doing, that's why I got confused on 
your next response. I still haven't found anything that gives guidance on doing 
so.


What I want to do is relatively simple (or so I thought), that is, a production 
stream is providing the initial avro data, the data needs to be used in a 
staging and a development environment, the problem is if staging is down or 
development is down, it affects the production flumes. I want to stop that, I 
was going to do so by saving the data to disk (it is not important to be real 
time), and scoop it out and move it on. file_roll loses the data integrity by 
wrapping it, and spooldir picks up the arvo headers of the wrapping , not of 
the original source.


I will look into writing a serializer that file_roll can use that will place 
the events to disk without wrapping the event.


Jim

________________________________
From: Laxman Ch <[email protected]<mailto:[email protected]>>
Sent: Friday, December 30, 2016 11:19:35 PM
To: [email protected]<mailto:[email protected]>
Subject: Re: Avro configuration

Apologies for any confusion Jim.

You need to make serializer and deserializer in sync. That's the gist of my 
first response.

hdfs.AvroEventSerializer is expecting some headers. We may need to pass some 
headers in the event at source.
Can you please post your sample code & config for end-to-end flow in git?

On 31 December 2016 at 04:51, Jim Langston 
<[email protected]<mailto:[email protected]>> wrote:

Ok - after messing with this for quite awhile, I am getting this error


FlumeException: Could not find schema for event


I added a schema and placed in in hdfs


agent.sinks.persistence-sink.type = file_roll
agent.sinks.persistence-sink.sink.directory = /home/flume/persistence
agent.sinks.persistence-sink.sink.serializer = 
org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
agent.sinks.persistence-sink.sink.serializer.schemaURL = 
hdfs://namenode/persistence/persistence.avsc
# Holders , can be used for tuning, defaults are the values
agent.sinks.persistence-sink.batchSize = 1000
agent.sinks.persistence-sink.sink.rollInterval = 300


Looking at the configurations syntax for file_roll , can it traverse  
serializer.schemaURL ? I see where
HDFS has serializer and serializer.* , but file_roll does not

Also, looking at the source code, it looks like the only way this error message 
can be generated
is is the schemaURL is null

private void initialize(Event event) throws IOException {
        Schema schema = null;
        String schemaUrl = event.getHeaders().get(AVRO_SCHEMA_URL_HEADER);
        if (schemaUrl != null) {
        schema = schemaCache.get(schemaUrl);
        if (schema == null) {
        schema = loadFromUrl(schemaUrl);
        schemaCache.put(schemaUrl, schema);
        }
        }
        if (schema == null) {
        String schemaString = 
event.getHeaders().get(AVRO_SCHEMA_LITERAL_HEADER);
        if (schemaString == null) {
        throw new FlumeException("Could not find schema for event " + event);
        }
        schema = new Schema.Parser().parse(schemaString);
        }





________________________________
From: Jim Langston <[email protected]<mailto:[email protected]>>
Sent: Friday, December 30, 2016 8:34:42 AM

To: [email protected]<mailto:[email protected]>
Subject: Re: Avro configuration


thanks - I was following your initial response


- Use a custom AvroEventSerializer to directly serialize the avro event and use 
it with file_roll sink.
Reference implementation is available in hdfs sink 
(org.apache.flume.sink.hdfs.AvroEventSerializer)
You may strip of hdfs dependencies from it achieve what you want.

This seems to indicate that I need to do a custom serializer following the 
constructs of the hdfs sink, what
I couldn't find were some good examples of creating custom serializers and then 
deploying them. I was tempted to
post to the developers group, but chose to just keep the context within this 
thread ..

Your new response has me confused, this seems to indicate that all I need to do 
is use the hdfs serializer


Jim


________________________________
From: Laxman Ch <[email protected]<mailto:[email protected]>>
Sent: Thursday, December 29, 2016 3:22:04 PM
To: [email protected]<mailto:[email protected]>
Subject: Re: Avro configuration

Directly configure this serializer to file_roll sink in your configuration.
>> agent.sinks.persistence-sink.sink.serializer = 
>> org.apache.flume.sink.hdfs.AvroEventSerializer

Please go through the documentation before you proceed.
https://flume.apache.org/FlumeUserGuide.html#avro-event-serializer


On 29 December 2016 at 23:29, Jim Langston 
<[email protected]<mailto:[email protected]>> wrote:

Thanks - I have pulled the flume source, are there any good step-by-step 
examples of creating a custom Serializer and deploying it for flume to use ?


Jim

________________________________
From: Laxman Ch <[email protected]<mailto:[email protected]>>
Sent: Wednesday, December 28, 2016 12:05:08 AM
To: [email protected]<mailto:[email protected]>
Subject: Re: Avro configuration

Jim,

In one-liner, FlumeEventAvroEventSerializer and AvroEventDeserializer are not 
in sync and they can't be used as a serde pair.

Flume's built-in avro serializer FlumeEventAvroEventSerializer which serializes 
Flume events with shell. It's important to note that, here actual raw event is 
wrapped inside the flume shell object and this raw object is treated as binary 
(which can be thrift, avro, or just a byte array, etc).
Flume's built-in avro deserializer AvroEventDeserializer which deserializes any 
generic event serialized and it wraps the deserialized event into another flume 
shell object.

This means as per your configuration, spool directory source 
(persistence-dev-source) will get an double wrapped flume event (FlumeEvent -> 
FlumeEvent -> raw event body)

To solve this problem, we need to have serializer and deserializer to be in 
sync. We can achieve it in either of the following approaches.
- Use a custom FluemEventAvroEventDeserializer to extract directly FlumeEvent 
without double wrapper and use it with spool directory source.

Similar attempt has already been made by Sebastian here.
https://issues.apache.org/jira/browse/FLUME-2942

I personally recommend to write a FlumeEventAvroEventDeserializer than to 
modify the existing one.

- Use a custom AvroEventSerializer to directly serialize the avro event and use 
it with file_roll sink.
Reference implementation is available in hdfs sink 
(org.apache.flume.sink.hdfs.AvroEventSerializer)
You may strip of hdfs dependencies from it achieve what you want.


On 28 December 2016 at 01:17, Jim Langston 
<[email protected]<mailto:[email protected]>> wrote:

Hi all,


I'm looking for some guidance , I have been trying to get a flow working that 
involves the following:


Source Avro --> mem channel --> file_roll


File Roll config


agent.sinks.persistence-sink.type = file_roll
agent.sinks.persistence-sink.sink.directory = /home/flume/persistence
agent.sinks.persistence-sink.sink.serializer = avro_event
agent.sinks.persistence-sink.batchSize = 1000
agent.sinks.persistence-sink.sink.rollInterval = 300


Once the data is on local disk, I want to flume the data to another flume server

Source spooldir --> mem channel -- Avro Sink (to another flume server)

agent.sources.persistence-dev-source.type = spooldir
agent.sources.persistence-dev-source.spoolDir = /home/flume/ready
agent.sources.persistence-dev-source.deserializer = avro
agent.sources.persistence-dev-source.deserializer.schemaType = LITERAL
agent.sources.persistence-dev-source.batchSize = 1000



The problem is that file_roll will put the incoming Avro data into a Avro 
container before storing the data on the local file system. Then when the data 
is picked up by the spooldir source , and sent to the flume server, it will 
have the file_roll headers when being read by the interceptor.

Is there a recommended way to save the Avro data coming in, that will maintain 
its integrity when sending on to another flume server, which is waiting on Avro 
data to multiplex and send to its channels.

I have tried many different variations, with the result of the above 
configurations getting the Avro to the other server with the Avro data that was 
received, but the problem is that the applications will see the container 
headers from the file_roll , and not the headers from the records from the 
initial Avro data.


Thanks,

Jim

schema that gets set by file_roll on its writes to disk:

{
  "type" : "record",
  "name" : "Event",
  "fields" : [ {
    "name" : "headers",
    "type" : {
      "type" : "map",
      "values" : "string"
    }
  }, {
    "name" : "body",
    "type" : "bytes"
  } ]
}








--
Thanks,
Laxman



--
Thanks,
Laxman



--
Thanks,
Laxman



--
Thanks,
Laxman

Reply via email to