Do you have a custom jar which contains
org.apache.flume.serialization.CustomLogAvroEventSerializer in flume classpath?
You can copy the custom jar file to <FLUME_HOME>/lib directory.
ERROR serialization.EventSerializerFactory: Unable to instantiate Builder from
org.apache.flume.serialization.CustomLogAvroEventSerializer
On Aug 2, 2012, at 8:26 PM, JP wrote:
> HI,
>
> Im getting errros
>
> 2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc started.
> 2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=> /
> localhost 41414] OPEN
> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=> /
> localhost :41414] BOUND: / localhost :41414
> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a,
> /localhost:3770 => / localhost :41414] CONNECTED: / localhost :3770
> 2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating hdfs:// localhost
> :8020/data/cssplogs/FlumeData.1343906943264.tmp
> 2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory: Unable to
> instantiate Builder from
> org.apache.flume.serialization.CustomLogAvroEventSerializer
> 2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error
> java.io.IOException: java.lang.NullPointerException
> at
> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
> at
> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
> at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
> at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
> at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
> at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
> at
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> Caused by: java.lang.NullPointerException
> at
> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75)
> at
> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188)
> ... 13 more
> 2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating hdfs:// localhost
> :8020/data/cssplogs/FlumeData.1343906943265.tmp
> 2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory: Unable to
> instantiate Builder from
> org.apache.flume.serialization.CustomLogAvroEventSerializer
> 2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error
> java.io.IOException: java.lang.NullPointerException
> at
> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
> at
> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
> at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
> at
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
> at
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
> at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
> at
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
> at
> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
>
> -----------------------------------------------------------------------------------------------------------------
>
>
> This is my avro file
>
> { "type": "record", "name": "LogEvent", "namespace":
> "org.apache.flume.serialization",
> "fields": [
> { "name": "srno", "type": "int" },
> { "name": "severity", "type": "int" },
> { "name": "timestamp", "type": "long" },
> { "name": "hostname", "type": "string" },
> { "name": "message", "type": "string" }
> ]
> }
>
> ------------------------------------------------------------------------------------------------
>
> This is the LogEvent created using maven-avro and little customized
>
> @SuppressWarnings("all")
> public class LogEvent extends SpecificRecordBase implements SpecificRecord {
> public static final Schema _SCHEMA =
> Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}");
> public int srno;
> public String severity;
> public long timestamp;
> public String hostname;
> public String message;
>
> public Schema getSchema() { return _SCHEMA; }
> public Object get(int _field) {
> switch (_field) {
> case 0: return srno;
> case 1: return severity;
> case 2: return timestamp;
> case 3: return hostname;
> case 4: return message;
> default: throw new AvroRuntimeException("Bad index");
> }
> }
>
> @SuppressWarnings(value="unchecked")
> public void set(int _field, Object _value) {
> switch (_field) {
> case 0: srno = (Integer)_value; break;
> case 1: severity = (String)_value; break;
> case 2: timestamp = (Long)_value; break;
> case 3: hostname = (String)_value; break;
> case 4: message = (String)_value; break;
> default: throw new AvroRuntimeException("Bad index");
> }
> }
>
> public void setSrno(int srno) {
> this.srno = srno;
> }
> public void setSeverity(String s) {
> severity = s;
> }
> public String getSeverity() {
> return severity;
> }
>
> public void setTimestamp(long t) {
> timestamp = t;
> }
>
> public long getTimestamp() {
> return timestamp;
> }
>
> public void setHostname(String h) {
> hostname = h;
> }
>
> public String getHostname() {
> return hostname;
> }
>
> public void setMessage(String m) {
> message = m;
> }
>
> public String getMessage() {
> return message;
> }
>
> @Override
> public void put(int field, Object value) {
> // TODO Auto-generated method stub
>
> }
> }
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
> agent2.sources = seqGenSrc
> agent2.channels = memoryChannel
> agent2.sinks = loggerSink
>
>
> agent2.sources.seqGenSrc.type = avro
> agent2.sources.seqGenSrc.bind=slcso-poc2-lnx
> agent2.sources.seqGenSrc.port=41414
>
> #agent2.sources.seqGenSrc.interceptors = time hostInterceptor
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.type =
> org.apache.flume.interceptor.HostInterceptor$Builder
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting
> = false
> #agent2.sources.seqGenSrc.interceptors.time.type =
> org.apache.flume.interceptor.TimestampInterceptor$Builder
>
> agent2.channels.memoryChannel.type = memory
> agent2.channels.memoryChannel.capacity = 1000000
> agent2.channels.memoryChannel.transactionCapacity = 1000000
> agent2.channels.memoryChannel.keep-alive = 30
>
> agent2.sources.seqGenSrc.channels = memoryChannel
>
> agent2.sinks.loggerSink.type = hdfs
> #agent2.sinks.loggerSink.hdfs.path = hdfs://10.105.39.204:8020/data/CspcLogs
> agent2.sinks.loggerSink.hdfs.path = hdfs://slcso-poc4-lnx:8020/data/cssplogs
> agent2.sinks.loggerSink.hdfs.fileType = DataStream
> #agent2.sinks.loggerSink.hdfs.writeFormat = Text
>
> agent2.sinks.loggerSink.channel = memoryChannel
> #agent2.sinks.loggerSink.serializer =
> org.apache.flume.serialization.BodyTextEventSerializer
> #agent2.sinks.loggerSink.serializer = avro_event
> agent2.sinks.loggerSink.serializer =
> org.apache.flume.serialization.CustomLogAvroEventSerializer
> agent2.sinks.loggerSink.serializer.compressionCodec = snappy
> #agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000
> agent2.channels.memoryChannel.type = memory
> ~
>
>
> -----------------------------------------------------------------------------------------------------------------
> The following is my class
>
> public class CustomLogAvroEventSerializer extends
> AbstractAvroEventSerializer<LogEvent> {
>
> private static final DateTimeFormatter dateFmt1 =
> DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC();
>
> private static final DateTimeFormatter dateFmt2 =
> DateTimeFormat.forPattern("MMM d HH:mm:ss").withZoneUTC();
>
>
> private static final Logger logger =
> LoggerFactory.getLogger(CustomLogAvroEventSerializer.class);
>
> private final OutputStream out;
> private final Schema schema;
>
> public CustomLogAvroEventSerializer(OutputStream out) throws
> IOException {
> this.out = out;
> this.schema =new LogEvent().getSchema();;
> }
>
> @Override
> protected OutputStream getOutputStream() {
> return out;
> }
>
> @Override
> protected Schema getSchema() {
> return schema;
> }
>
> // very simple rfc3164 parser
> @Override
> protected LogEvent convert(Event event) {
> LogEvent sle = new LogEvent();
>
> // Stringify body so it's easy to parse.
> // This is a pretty inefficient way to do it.
> String msg = new String(event.getBody(), Charsets.UTF_8);
>
> // parser read pointer
> int seek = 0;
>
> // Check Flume headers to see if we came from SyslogTcp(or UDP)
> Source,
> // which at the time of this writing only parses the priority.
> // This is a bit schizophrenic and it should parse all the fields or
> none.
> Map<String, String> headers = event.getHeaders();
> boolean fromSyslogSource = false;
> if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) {
> fromSyslogSource = true;
> int srno = Integer.parseInt(headers.get("srno"));
> sle.setSrno(srno);
> }else{
> sle.setSrno(121);
> }
> if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) {
> fromSyslogSource = true;
> String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY);
> sle.setSeverity(severity);
> }
>
> // assume the message was received raw (maybe via NetcatSource)
> // parse the priority string
> if (!fromSyslogSource) {
> if (msg.charAt(0) == '<') {
> int end = msg.indexOf(">");
> if (end > -1) {
> seek = end + 1;
> String priStr = msg.substring(1, end);
> // int priority = Integer.parseInt(priStr);
> // String severity = priStr;
>
> sle.setSeverity(priStr);
> }
> }
> }
>
> // parse the timestamp
> String timestampStr = msg.substring(seek, seek + 15);
> long ts = parseRfc3164Date(timestampStr);
> if (ts != 0) {
> sle.setTimestamp(ts);
> seek += 15 + 1; // space after timestamp
> }
>
> // parse the hostname
> int nextSpace = msg.indexOf(' ', seek);
> if (nextSpace > -1) {
> String hostname = msg.substring(seek, nextSpace);
> sle.setHostname(hostname);
> seek = nextSpace + 1;
> }
>
> // everything else is the message
> String actualMessage = msg.substring(seek);
> sle.setMessage(actualMessage);
>
> logger.debug("Serialized event as: {}", sle);
>
> return sle;
> }
>
> private static long parseRfc3164Date(String in) {
> DateTime date = null;
> try {
> date = dateFmt1.parseDateTime(in);
> } catch (IllegalArgumentException e) {
> // ignore the exception, we act based on nullity of date object
> logger.debug("Date parse failed on ({}), trying single-digit
> date", in);
> }
>
> if (date == null) {
> try {
> date = dateFmt2.parseDateTime(in);
> } catch (IllegalArgumentException e) {
> // ignore the exception, we act based on nullity of date
> object
> logger.debug("2nd date parse failed on ({}), unknown date
> format", in);
> }
> }
>
> // hacky stuff to try and deal with boundary cases, i.e. new
> year's eve.
> // rfc3164 dates are really dumb.
> // NB: cannot handle replaying of old logs or going back to the
> future
> if (date != null) {
> DateTime now = new DateTime();
> int year = now.getYear();
> DateTime corrected = date.withYear(year);
>
> // flume clock is ahead or there is some latency, and the year
> rolled
> if (corrected.isAfter(now) &&
> corrected.minusMonths(1).isAfter(now)) {
> corrected = date.withYear(year - 1);
> // flume clock is behind and the year rolled
> } else if (corrected.isBefore(now) &&
> corrected.plusMonths(1).isBefore(now)) {
> corrected = date.withYear(year + 1);
> }
> date = corrected;
> }
>
> if (date == null) {
> return 0;
> }
>
> return date.getMillis();
> }
>
> public static class Builder implements EventSerializer.Builder {
>
> @Override
> public EventSerializer build(Context context, OutputStream out) {
> CustomLogAvroEventSerializer writer = null;
> try {
> writer = new CustomLogAvroEventSerializer(out);
> writer.configure(context);
> } catch (IOException e) {
> logger.error("Unable to parse schema file. Exception
> follows.", e);
> }
> return writer;
> }
>
> }
>
>
>
> }
>
> Please suggest me i need output like this and i want to customize like log4j
> -------------------------------------------------------------------------------------------------------------------------------------
> 172 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message
> 188 [main] INFO com.cisco.flume.FlumeTest - Sample info message1
> 203 [main] WARN com.cisco.flume.FlumeTest - Sample warn message
> 219 [main] INFO com.cisco.flume.FlumeTest - Sample info message2
> 219 [main] INFO com.cisco.flume.FlumeTest - Sample info message3
> 266 [main] ERROR com.cisco.flume.FlumeTest - Sample error message
> 282 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message
> 282 [main] INFO com.cisco.flume.FlumeTest - Sample info message4
>
>
> --
> JP
>
>
>
> --
> JP