Do you have a custom jar which contains 
org.apache.flume.serialization.CustomLogAvroEventSerializer in flume classpath? 
You can copy the custom jar file to <FLUME_HOME>/lib directory.

ERROR serialization.EventSerializerFactory: Unable to instantiate Builder from 
org.apache.flume.serialization.CustomLogAvroEventSerializer

On Aug 2, 2012, at 8:26 PM, JP wrote:

> HI,
> 
> Im getting errros
> 
> 2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc started.
> 2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=> / 
> localhost 41414] OPEN
> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=> / 
> localhost :41414] BOUND: / localhost :41414
> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, 
> /localhost:3770 => / localhost :41414] CONNECTED: / localhost :3770
> 2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating hdfs:// localhost 
> :8020/data/cssplogs/FlumeData.1343906943264.tmp
> 2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory: Unable to 
> instantiate Builder from 
> org.apache.flume.serialization.CustomLogAvroEventSerializer
> 2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error
> java.io.IOException: java.lang.NullPointerException
>         at 
> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
>         at 
> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
>         at 
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
>         at 
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
>         at 
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>         at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
>         at 
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
>         at 
> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
>         at 
> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
>         at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
> Caused by: java.lang.NullPointerException
>         at 
> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75)
>         at 
> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188)
>         ... 13 more
> 2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating hdfs:// localhost 
> :8020/data/cssplogs/FlumeData.1343906943265.tmp
> 2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory: Unable to 
> instantiate Builder from 
> org.apache.flume.serialization.CustomLogAvroEventSerializer
> 2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error
> java.io.IOException: java.lang.NullPointerException
>         at 
> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
>         at 
> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
>         at 
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
>         at 
> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
>         at 
> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>         at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
>         at 
> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
>         at 
> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
>         at 
> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
>         at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
> 
> -----------------------------------------------------------------------------------------------------------------
> 
> 
> This is my avro file
> 
> { "type": "record", "name": "LogEvent", "namespace": 
> "org.apache.flume.serialization",
>   "fields": [
>     { "name": "srno",  "type": "int" },
>     { "name": "severity",  "type": "int" },
>     { "name": "timestamp", "type": "long" },
>     { "name": "hostname",  "type": "string" },
>     { "name": "message",   "type": "string" }
>   ]
> }
> 
> ------------------------------------------------------------------------------------------------
> 
> This is the LogEvent created using maven-avro and little customized
> 
> @SuppressWarnings("all")
> public class LogEvent extends SpecificRecordBase implements SpecificRecord {
>   public static final Schema _SCHEMA = 
> Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}");
>   public int srno;
>   public String severity;
>   public long timestamp;
>   public String hostname;
>   public String message;
>   
>   public Schema getSchema() { return _SCHEMA; }
>   public Object get(int _field) {
>     switch (_field) {
>     case 0: return srno;
>     case 1: return severity;
>     case 2: return timestamp;
>     case 3: return hostname;
>     case 4: return message;
>     default: throw new AvroRuntimeException("Bad index");
>     }
>   }
>   
>   @SuppressWarnings(value="unchecked")
>   public void set(int _field, Object _value) {
>     switch (_field) {
>     case 0: srno = (Integer)_value; break;
>     case 1: severity = (String)_value; break;
>     case 2: timestamp = (Long)_value; break;
>     case 3: hostname = (String)_value; break;
>     case 4: message = (String)_value; break;
>     default: throw new AvroRuntimeException("Bad index");
>     }
>   }
>   
>     public void setSrno(int srno) {
>         this.srno = srno;
>     }
>     public void setSeverity(String s) { 
>         severity = s; 
>     }
>     public String getSeverity() { 
>         return severity; 
>     }
> 
>     public void setTimestamp(long t) {
>         timestamp = t;
>     }
> 
>     public long getTimestamp() {
>         return timestamp;
>     }
> 
>     public void setHostname(String h) {
>         hostname = h;
>     }
> 
>     public String getHostname() {
>         return hostname;
>     }
> 
>     public void setMessage(String m) {
>         message = m;
>     }
> 
>     public String getMessage() {
>         return message;
>     }
> 
> @Override
> public void put(int field, Object value) {
>     // TODO Auto-generated method stub
>     
> }
> }
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
> agent2.sources = seqGenSrc
> agent2.channels = memoryChannel
> agent2.sinks = loggerSink
> 
> 
> agent2.sources.seqGenSrc.type = avro
> agent2.sources.seqGenSrc.bind=slcso-poc2-lnx
> agent2.sources.seqGenSrc.port=41414
> 
> #agent2.sources.seqGenSrc.interceptors = time hostInterceptor
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.type = 
> org.apache.flume.interceptor.HostInterceptor$Builder
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting
>  = false
> #agent2.sources.seqGenSrc.interceptors.time.type = 
> org.apache.flume.interceptor.TimestampInterceptor$Builder
> 
> agent2.channels.memoryChannel.type = memory
> agent2.channels.memoryChannel.capacity = 1000000
> agent2.channels.memoryChannel.transactionCapacity = 1000000
> agent2.channels.memoryChannel.keep-alive = 30
> 
> agent2.sources.seqGenSrc.channels = memoryChannel
> 
> agent2.sinks.loggerSink.type = hdfs
> #agent2.sinks.loggerSink.hdfs.path = hdfs://10.105.39.204:8020/data/CspcLogs
> agent2.sinks.loggerSink.hdfs.path = hdfs://slcso-poc4-lnx:8020/data/cssplogs
> agent2.sinks.loggerSink.hdfs.fileType = DataStream
> #agent2.sinks.loggerSink.hdfs.writeFormat = Text
> 
> agent2.sinks.loggerSink.channel = memoryChannel
> #agent2.sinks.loggerSink.serializer = 
> org.apache.flume.serialization.BodyTextEventSerializer
> #agent2.sinks.loggerSink.serializer = avro_event
> agent2.sinks.loggerSink.serializer = 
> org.apache.flume.serialization.CustomLogAvroEventSerializer
> agent2.sinks.loggerSink.serializer.compressionCodec = snappy
> #agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000
> agent2.channels.memoryChannel.type = memory
> ~
> 
> 
> -----------------------------------------------------------------------------------------------------------------
> The following is my class
> 
> public class CustomLogAvroEventSerializer extends
>         AbstractAvroEventSerializer<LogEvent> {
>     
>       private static final DateTimeFormatter dateFmt1 =
>           DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC();
>       
>       private static final DateTimeFormatter dateFmt2 =
>           DateTimeFormat.forPattern("MMM  d HH:mm:ss").withZoneUTC();
> 
>       
>       private static final Logger logger =
>           LoggerFactory.getLogger(CustomLogAvroEventSerializer.class);
>       
>        private final OutputStream out;
>       private final Schema schema;
> 
>       public CustomLogAvroEventSerializer(OutputStream out) throws 
> IOException {
>         this.out = out;
>         this.schema =new LogEvent().getSchema();;
>       }
> 
>       @Override
>       protected OutputStream getOutputStream() {
>         return out;
>       }
> 
>       @Override
>       protected Schema getSchema() {
>         return schema;
>       }
> 
>       // very simple rfc3164 parser
>       @Override
>       protected LogEvent convert(Event event) {
>           LogEvent sle = new LogEvent();
> 
>         // Stringify body so it's easy to parse.
>         // This is a pretty inefficient way to do it.
>         String msg = new String(event.getBody(), Charsets.UTF_8);
> 
>         // parser read pointer
>         int seek = 0;
> 
>         // Check Flume headers to see if we came from SyslogTcp(or UDP) 
> Source,
>         // which at the time of this writing only parses the priority.
>         // This is a bit schizophrenic and it should parse all the fields or 
> none.
>         Map<String, String> headers = event.getHeaders();
>         boolean fromSyslogSource = false;
>         if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) {
>           fromSyslogSource = true;
>           int srno = Integer.parseInt(headers.get("srno"));
>           sle.setSrno(srno);
>         }else{
>             sle.setSrno(121);
>         }
>         if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) {
>           fromSyslogSource = true;
>           String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY);
>           sle.setSeverity(severity);
>         }
> 
>         // assume the message was received raw (maybe via NetcatSource)
>         // parse the priority string
>         if (!fromSyslogSource) {
>           if (msg.charAt(0) == '<') {
>             int end = msg.indexOf(">");
>             if (end > -1) {
>               seek = end + 1;
>               String priStr = msg.substring(1, end);
>              // int priority = Integer.parseInt(priStr);
>              // String severity = priStr;
>           
>               sle.setSeverity(priStr);
>             }
>           }
>         }
> 
>         // parse the timestamp
>         String timestampStr = msg.substring(seek, seek + 15);
>         long ts = parseRfc3164Date(timestampStr);
>         if (ts != 0) {
>           sle.setTimestamp(ts);
>           seek += 15 + 1; // space after timestamp
>         }
> 
>         // parse the hostname
>         int nextSpace = msg.indexOf(' ', seek);
>         if (nextSpace > -1) {
>           String hostname = msg.substring(seek, nextSpace);
>           sle.setHostname(hostname);
>           seek = nextSpace + 1;
>         }
> 
>         // everything else is the message
>         String actualMessage = msg.substring(seek);
>         sle.setMessage(actualMessage);
> 
>         logger.debug("Serialized event as: {}", sle);
> 
>         return sle;
>       }
>       
>       private static long parseRfc3164Date(String in) {
>             DateTime date = null;
>             try {
>               date = dateFmt1.parseDateTime(in);
>             } catch (IllegalArgumentException e) {
>               // ignore the exception, we act based on nullity of date object
>               logger.debug("Date parse failed on ({}), trying single-digit 
> date", in);
>             }
> 
>             if (date == null) {
>               try {
>                 date = dateFmt2.parseDateTime(in);
>               } catch (IllegalArgumentException e) {
>                 // ignore the exception, we act based on nullity of date 
> object
>                 logger.debug("2nd date parse failed on ({}), unknown date 
> format", in);
>               }
>             }
> 
>             // hacky stuff to try and deal with boundary cases, i.e. new 
> year's eve.
>             // rfc3164 dates are really dumb.
>             // NB: cannot handle replaying of old logs or going back to the 
> future
>             if (date != null) {
>               DateTime now = new DateTime();
>               int year = now.getYear();
>               DateTime corrected = date.withYear(year);
> 
>               // flume clock is ahead or there is some latency, and the year 
> rolled
>               if (corrected.isAfter(now) && 
> corrected.minusMonths(1).isAfter(now)) {
>                 corrected = date.withYear(year - 1);
>               // flume clock is behind and the year rolled
>               } else if (corrected.isBefore(now) && 
> corrected.plusMonths(1).isBefore(now)) {
>                 corrected = date.withYear(year + 1);
>               }
>               date = corrected;
>             }
> 
>             if (date == null) {
>               return 0;
>             }
> 
>             return date.getMillis();
>           }
>       
>       public static class Builder implements EventSerializer.Builder {
> 
>             @Override
>             public EventSerializer build(Context context, OutputStream out) {
>                 CustomLogAvroEventSerializer writer = null;
>               try {
>                 writer = new CustomLogAvroEventSerializer(out);
>                 writer.configure(context);
>               } catch (IOException e) {
>                 logger.error("Unable to parse schema file. Exception 
> follows.", e);
>               }
>               return writer;
>             }
> 
>     }
>       
> 
> 
> }
> 
> Please suggest me i need output like this and i want to customize like log4j
> -------------------------------------------------------------------------------------------------------------------------------------
> 172  [main] FATAL com.cisco.flume.FlumeTest  - Sample fatal message
> 188  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message1
> 203  [main] WARN  com.cisco.flume.FlumeTest  - Sample warn message
> 219  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message2
> 219  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message3
> 266  [main] ERROR com.cisco.flume.FlumeTest  - Sample error message
> 282  [main] FATAL com.cisco.flume.FlumeTest  - Sample fatal message
> 282  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message4
> 
> 
> -- 
> JP
> 
> 
> 
> -- 
> JP

Reply via email to