Ok - after messing with this for quite awhile, I am getting this error
FlumeException: Could not find schema for event
I added a schema and placed in in hdfs
agent.sinks.persistence-sink.type = file_roll
agent.sinks.persistence-sink.sink.directory = /home/flume/persistence
agent.sinks.persistence-sink.sink.serializer =
org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
agent.sinks.persistence-sink.sink.serializer.schemaURL =
hdfs://namenode/persistence/persistence.avsc
# Holders , can be used for tuning, defaults are the values
agent.sinks.persistence-sink.batchSize = 1000
agent.sinks.persistence-sink.sink.rollInterval = 300
Looking at the configurations syntax for file_roll , can it traverse
serializer.schemaURL ? I see where
HDFS has serializer and serializer.* , but file_roll does not
Also, looking at the source code, it looks like the only way this error message
can be generated
is is the schemaURL is null
private void initialize(Event event) throws IOException {
Schema schema = null;
String schemaUrl = event.getHeaders().get(AVRO_SCHEMA_URL_HEADER);
if (schemaUrl != null) {
schema = schemaCache.get(schemaUrl);
if (schema == null) {
schema = loadFromUrl(schemaUrl);
schemaCache.put(schemaUrl, schema);
}
}
if (schema == null) {
String schemaString =
event.getHeaders().get(AVRO_SCHEMA_LITERAL_HEADER);
if (schemaString == null) {
throw new FlumeException("Could not find schema for event " + event);
}
schema = new Schema.Parser().parse(schemaString);
}
________________________________
From: Jim Langston <[email protected]>
Sent: Friday, December 30, 2016 8:34:42 AM
To: [email protected]
Subject: Re: Avro configuration
thanks - I was following your initial response
- Use a custom AvroEventSerializer to directly serialize the avro event and use
it with file_roll sink.
Reference implementation is available in hdfs sink
(org.apache.flume.sink.hdfs.AvroEventSerializer)
You may strip of hdfs dependencies from it achieve what you want.
This seems to indicate that I need to do a custom serializer following the
constructs of the hdfs sink, what
I couldn't find were some good examples of creating custom serializers and then
deploying them. I was tempted to
post to the developers group, but chose to just keep the context within this
thread ..
Your new response has me confused, this seems to indicate that all I need to do
is use the hdfs serializer
Jim
________________________________
From: Laxman Ch <[email protected]>
Sent: Thursday, December 29, 2016 3:22:04 PM
To: [email protected]
Subject: Re: Avro configuration
Directly configure this serializer to file_roll sink in your configuration.
>> agent.sinks.persistence-sink.sink.serializer =
>> org.apache.flume.sink.hdfs.AvroEventSerializer
Please go through the documentation before you proceed.
https://flume.apache.org/FlumeUserGuide.html#avro-event-serializer
On 29 December 2016 at 23:29, Jim Langston
<[email protected]<mailto:[email protected]>> wrote:
Thanks - I have pulled the flume source, are there any good step-by-step
examples of creating a custom Serializer and deploying it for flume to use ?
Jim
________________________________
From: Laxman Ch <[email protected]<mailto:[email protected]>>
Sent: Wednesday, December 28, 2016 12:05:08 AM
To: [email protected]<mailto:[email protected]>
Subject: Re: Avro configuration
Jim,
In one-liner, FlumeEventAvroEventSerializer and AvroEventDeserializer are not
in sync and they can't be used as a serde pair.
Flume's built-in avro serializer FlumeEventAvroEventSerializer which serializes
Flume events with shell. It's important to note that, here actual raw event is
wrapped inside the flume shell object and this raw object is treated as binary
(which can be thrift, avro, or just a byte array, etc).
Flume's built-in avro deserializer AvroEventDeserializer which deserializes any
generic event serialized and it wraps the deserialized event into another flume
shell object.
This means as per your configuration, spool directory source
(persistence-dev-source) will get an double wrapped flume event (FlumeEvent ->
FlumeEvent -> raw event body)
To solve this problem, we need to have serializer and deserializer to be in
sync. We can achieve it in either of the following approaches.
- Use a custom FluemEventAvroEventDeserializer to extract directly FlumeEvent
without double wrapper and use it with spool directory source.
Similar attempt has already been made by Sebastian here.
https://issues.apache.org/jira/browse/FLUME-2942
I personally recommend to write a FlumeEventAvroEventDeserializer than to
modify the existing one.
- Use a custom AvroEventSerializer to directly serialize the avro event and use
it with file_roll sink.
Reference implementation is available in hdfs sink
(org.apache.flume.sink.hdfs.AvroEventSerializer)
You may strip of hdfs dependencies from it achieve what you want.
On 28 December 2016 at 01:17, Jim Langston
<[email protected]<mailto:[email protected]>> wrote:
Hi all,
I'm looking for some guidance , I have been trying to get a flow working that
involves the following:
Source Avro --> mem channel --> file_roll
File Roll config
agent.sinks.persistence-sink.type = file_roll
agent.sinks.persistence-sink.sink.directory = /home/flume/persistence
agent.sinks.persistence-sink.sink.serializer = avro_event
agent.sinks.persistence-sink.batchSize = 1000
agent.sinks.persistence-sink.sink.rollInterval = 300
Once the data is on local disk, I want to flume the data to another flume server
Source spooldir --> mem channel -- Avro Sink (to another flume server)
agent.sources.persistence-dev-source.type = spooldir
agent.sources.persistence-dev-source.spoolDir = /home/flume/ready
agent.sources.persistence-dev-source.deserializer = avro
agent.sources.persistence-dev-source.deserializer.schemaType = LITERAL
agent.sources.persistence-dev-source.batchSize = 1000
The problem is that file_roll will put the incoming Avro data into a Avro
container before storing the data on the local file system. Then when the data
is picked up by the spooldir source , and sent to the flume server, it will
have the file_roll headers when being read by the interceptor.
Is there a recommended way to save the Avro data coming in, that will maintain
its integrity when sending on to another flume server, which is waiting on Avro
data to multiplex and send to its channels.
I have tried many different variations, with the result of the above
configurations getting the Avro to the other server with the Avro data that was
received, but the problem is that the applications will see the container
headers from the file_roll , and not the headers from the records from the
initial Avro data.
Thanks,
Jim
schema that gets set by file_roll on its writes to disk:
{
"type" : "record",
"name" : "Event",
"fields" : [ {
"name" : "headers",
"type" : {
"type" : "map",
"values" : "string"
}
}, {
"name" : "body",
"type" : "bytes"
} ]
}
--
Thanks,
Laxman
--
Thanks,
Laxman