[
https://issues.apache.org/jira/browse/FLUME-3238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16545052#comment-16545052
]
Ferran Fernandez Garrido commented on FLUME-3238:
-------------------------------------------------
Hi!
After a quick review I think that the problem is not related with TailDirSource
itself. Everything in your configuration seems to be right except for one thing.
There is a property that you are not using in KafkaSink:
--------------------------------------------------------------------------------------------
public static final String AVRO_EVENT = "useFlumeEventFormat";
--------------------------------------------------------------------------------------------
I think that you should set that property to "true" in order to map the headers
into the Kafka topic.
It can be seen in Github KafkaSink code:
--------------------------------------------------------------------------------------------------
|private byte[] serializeEvent(Event event, boolean useAvroEventFormat) throws
IOException {|
|byte[] bytes;|
|if (useAvroEventFormat) {|
|if (!tempOutStream.isPresent()) {|
|tempOutStream = Optional.of(new ByteArrayOutputStream());|
|}|
|if (!writer.isPresent()) {|
|writer = Optional.of(new
SpecificDatumWriter<AvroFlumeEvent>(AvroFlumeEvent.class));|
|}|
|tempOutStream.get().reset();|
|AvroFlumeEvent e = new AvroFlumeEvent(toCharSeqMap(event.getHeaders()),|
|ByteBuffer.wrap(event.getBody()));|
|encoder = EncoderFactory.get().directBinaryEncoder(tempOutStream.get(),
encoder);|
|writer.get().write(e, encoder);|
|encoder.flush();|
|bytes = tempOutStream.get().toByteArray();|
|} else {|
|bytes = event.getBody();|
|}|
|return bytes;|
}
If the property is false (I assume that the default value is false), it only
serializes the body, so the headers that contains all the information about
which file are you reading is gone.
Try to set that property and see if this solves the problem.
Regards,
> TailDir Source + Kafka Sink + Memory Channel - Couldn't identify event source
> filename
> --------------------------------------------------------------------------------------
>
> Key: FLUME-3238
> URL: https://issues.apache.org/jira/browse/FLUME-3238
> Project: Flume
> Issue Type: Question
> Components: Kafka Channel
> Affects Versions: 1.8.0
> Reporter: r2d2
> Priority: Critical
> Attachments: exmple1.log, exmple2.log, taildir_flume.conf.txt
>
>
> I am using Taildir to consume my log event and send it to kafka sink for
> further processing before its indexed to a table for any analytics.
> The events received in kafka topic doesn't carry which file the event belongs
> to and also couldn't trace back to the actual file the event really occurred.
> i tried so many config attribute as per the documentation, but the output
> doesnt change, it carries the raw event only to kafka topic.
> Did anyone had similar requirement and able to achieve it?
> As per the ticket# [https://issues.cloudera.org/browse/FLUME-258] they say
> its resolved. But doesnt seem to be available in new versions.
>
> I have included my sample config and input files, if anyone tried similar
> implementation, please let me know how to achieve this.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]