can you please try with

>> agent2.sinks.loggerSink.serializer = 
>> org.apache.flume.serialization.CustomLogAvroEventSerializer$Builder

On Aug 2, 2012, at 8:39 PM, JP wrote:

> Thanks Mubarak,
> 
> I have added the jar to lib .
> 
> After that only im getting the exception.
> 
> Any help to resolve this issue.
> 
> On Fri, Aug 3, 2012 at 9:05 AM, Mubarak Seyed <[email protected]> wrote:
> Do you have a custom jar which contains 
> org.apache.flume.serialization.CustomLogAvroEventSerializer in flume 
> classpath? You can copy the custom jar file to <FLUME_HOME>/lib directory.
> 
> ERROR serialization.EventSerializerFactory: Unable to instantiate Builder 
> from org.apache.flume.serialization.CustomLogAvroEventSerializer
> 
> On Aug 2, 2012, at 8:26 PM, JP wrote:
> 
>> HI,
>> 
>> Im getting errros
>> 
>> 2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc 
>> started.
>> 2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=> 
>> / localhost 41414] OPEN
>> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=> 
>> / localhost :41414] BOUND: / localhost :41414
>> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, 
>> /localhost:3770 => / localhost :41414] CONNECTED: / localhost :3770
>> 2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating hdfs:// localhost 
>> :8020/data/cssplogs/FlumeData.1343906943264.tmp
>> 2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory: Unable 
>> to instantiate Builder from 
>> org.apache.flume.serialization.CustomLogAvroEventSerializer
>> 2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error
>> java.io.IOException: java.lang.NullPointerException
>>         at 
>> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
>>         at 
>> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
>>         at 
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
>>         at 
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
>>         at 
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>>         at 
>> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
>>         at 
>> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
>>         at 
>> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
>>         at 
>> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
>>         at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>         at java.lang.Thread.run(Thread.java:662)
>> Caused by: java.lang.NullPointerException
>>         at 
>> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75)
>>         at 
>> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188)
>>         ... 13 more
>> 2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating hdfs:// localhost 
>> :8020/data/cssplogs/FlumeData.1343906943265.tmp
>> 2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory: Unable 
>> to instantiate Builder from 
>> org.apache.flume.serialization.CustomLogAvroEventSerializer
>> 2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error
>> java.io.IOException: java.lang.NullPointerException
>>         at 
>> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
>>         at 
>> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
>>         at 
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
>>         at 
>> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
>>         at 
>> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
>>         at 
>> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
>>         at 
>> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
>>         at 
>> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
>>         at 
>> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
>>         at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>         at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>         at java.lang.Thread.run(Thread.java:662)
>> 
>> -----------------------------------------------------------------------------------------------------------------
>> 
>> 
>> This is my avro file
>> 
>> { "type": "record", "name": "LogEvent", "namespace": 
>> "org.apache.flume.serialization",
>>   "fields": [
>>     { "name": "srno",  "type": "int" },
>>     { "name": "severity",  "type": "int" },
>>     { "name": "timestamp", "type": "long" },
>>     { "name": "hostname",  "type": "string" },
>>     { "name": "message",   "type": "string" }
>>   ]
>> }
>> 
>> ------------------------------------------------------------------------------------------------
>> 
>> This is the LogEvent created using maven-avro and little customized
>> 
>> @SuppressWarnings("all")
>> public class LogEvent extends SpecificRecordBase implements SpecificRecord {
>>   public static final Schema _SCHEMA = 
>> Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}");
>>   public int srno;
>>   public String severity;
>>   public long timestamp;
>>   public String hostname;
>>   public String message;
>>   
>>   public Schema getSchema() { return _SCHEMA; }
>>   public Object get(int _field) {
>>     switch (_field) {
>>     case 0: return srno;
>>     case 1: return severity;
>>     case 2: return timestamp;
>>     case 3: return hostname;
>>     case 4: return message;
>>     default: throw new AvroRuntimeException("Bad index");
>>     }
>>   }
>>   
>>   @SuppressWarnings(value="unchecked")
>>   public void set(int _field, Object _value) {
>>     switch (_field) {
>>     case 0: srno = (Integer)_value; break;
>>     case 1: severity = (String)_value; break;
>>     case 2: timestamp = (Long)_value; break;
>>     case 3: hostname = (String)_value; break;
>>     case 4: message = (String)_value; break;
>>     default: throw new AvroRuntimeException("Bad index");
>>     }
>>   }
>>   
>>     public void setSrno(int srno) {
>>         this.srno = srno;
>>     }
>>     public void setSeverity(String s) { 
>>         severity = s; 
>>     }
>>     public String getSeverity() { 
>>         return severity; 
>>     }
>> 
>>     public void setTimestamp(long t) {
>>         timestamp = t;
>>     }
>> 
>>     public long getTimestamp() {
>>         return timestamp;
>>     }
>> 
>>     public void setHostname(String h) {
>>         hostname = h;
>>     }
>> 
>>     public String getHostname() {
>>         return hostname;
>>     }
>> 
>>     public void setMessage(String m) {
>>         message = m;
>>     }
>> 
>>     public String getMessage() {
>>         return message;
>>     }
>> 
>> @Override
>> public void put(int field, Object value) {
>>     // TODO Auto-generated method stub
>>     
>> }
>> }
>> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------
>> agent2.sources = seqGenSrc
>> agent2.channels = memoryChannel
>> agent2.sinks = loggerSink
>> 
>> 
>> agent2.sources.seqGenSrc.type = avro
>> agent2.sources.seqGenSrc.bind=slcso-poc2-lnx
>> agent2.sources.seqGenSrc.port=41414
>> 
>> #agent2.sources.seqGenSrc.interceptors = time hostInterceptor
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.type = 
>> org.apache.flume.interceptor.HostInterceptor$Builder
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
>> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting
>>  = false
>> #agent2.sources.seqGenSrc.interceptors.time.type = 
>> org.apache.flume.interceptor.TimestampInterceptor$Builder
>> 
>> agent2.channels.memoryChannel.type = memory
>> agent2.channels.memoryChannel.capacity = 1000000
>> agent2.channels.memoryChannel.transactionCapacity = 1000000
>> agent2.channels.memoryChannel.keep-alive = 30
>> 
>> agent2.sources.seqGenSrc.channels = memoryChannel
>> 
>> agent2.sinks.loggerSink.type = hdfs
>> #agent2.sinks.loggerSink.hdfs.path = hdfs://10.105.39.204:8020/data/CspcLogs
>> agent2.sinks.loggerSink.hdfs.path = hdfs://slcso-poc4-lnx:8020/data/cssplogs
>> agent2.sinks.loggerSink.hdfs.fileType = DataStream
>> #agent2.sinks.loggerSink.hdfs.writeFormat = Text
>> 
>> agent2.sinks.loggerSink.channel = memoryChannel
>> #agent2.sinks.loggerSink.serializer = 
>> org.apache.flume.serialization.BodyTextEventSerializer
>> #agent2.sinks.loggerSink.serializer = avro_event
>> agent2.sinks.loggerSink.serializer = 
>> org.apache.flume.serialization.CustomLogAvroEventSerializer
>> agent2.sinks.loggerSink.serializer.compressionCodec = snappy
>> #agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000
>> agent2.channels.memoryChannel.type = memory
>> ~
>> 
>> 
>> -----------------------------------------------------------------------------------------------------------------
>> The following is my class
>> 
>> public class CustomLogAvroEventSerializer extends
>>         AbstractAvroEventSerializer<LogEvent> {
>>     
>>       private static final DateTimeFormatter dateFmt1 =
>>           DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC();
>>       
>>       private static final DateTimeFormatter dateFmt2 =
>>           DateTimeFormat.forPattern("MMM  d HH:mm:ss").withZoneUTC();
>> 
>>       
>>       private static final Logger logger =
>>           LoggerFactory.getLogger(CustomLogAvroEventSerializer.class);
>>       
>>        private final OutputStream out;
>>       private final Schema schema;
>> 
>>       public CustomLogAvroEventSerializer(OutputStream out) throws 
>> IOException {
>>         this.out = out;
>>         this.schema =new LogEvent().getSchema();;
>>       }
>> 
>>       @Override
>>       protected OutputStream getOutputStream() {
>>         return out;
>>       }
>> 
>>       @Override
>>       protected Schema getSchema() {
>>         return schema;
>>       }
>> 
>>       // very simple rfc3164 parser
>>       @Override
>>       protected LogEvent convert(Event event) {
>>           LogEvent sle = new LogEvent();
>> 
>>         // Stringify body so it's easy to parse.
>>         // This is a pretty inefficient way to do it.
>>         String msg = new String(event.getBody(), Charsets.UTF_8);
>> 
>>         // parser read pointer
>>         int seek = 0;
>> 
>>         // Check Flume headers to see if we came from SyslogTcp(or UDP) 
>> Source,
>>         // which at the time of this writing only parses the priority.
>>         // This is a bit schizophrenic and it should parse all the fields or 
>> none.
>>         Map<String, String> headers = event.getHeaders();
>>         boolean fromSyslogSource = false;
>>         if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) {
>>           fromSyslogSource = true;
>>           int srno = Integer.parseInt(headers.get("srno"));
>>           sle.setSrno(srno);
>>         }else{
>>             sle.setSrno(121);
>>         }
>>         if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) {
>>           fromSyslogSource = true;
>>           String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY);
>>           sle.setSeverity(severity);
>>         }
>> 
>>         // assume the message was received raw (maybe via NetcatSource)
>>         // parse the priority string
>>         if (!fromSyslogSource) {
>>           if (msg.charAt(0) == '<') {
>>             int end = msg.indexOf(">");
>>             if (end > -1) {
>>               seek = end + 1;
>>               String priStr = msg.substring(1, end);
>>              // int priority = Integer.parseInt(priStr);
>>              // String severity = priStr;
>>           
>>               sle.setSeverity(priStr);
>>             }
>>           }
>>         }
>> 
>>         // parse the timestamp
>>         String timestampStr = msg.substring(seek, seek + 15);
>>         long ts = parseRfc3164Date(timestampStr);
>>         if (ts != 0) {
>>           sle.setTimestamp(ts);
>>           seek += 15 + 1; // space after timestamp
>>         }
>> 
>>         // parse the hostname
>>         int nextSpace = msg.indexOf(' ', seek);
>>         if (nextSpace > -1) {
>>           String hostname = msg.substring(seek, nextSpace);
>>           sle.setHostname(hostname);
>>           seek = nextSpace + 1;
>>         }
>> 
>>         // everything else is the message
>>         String actualMessage = msg.substring(seek);
>>         sle.setMessage(actualMessage);
>> 
>>         logger.debug("Serialized event as: {}", sle);
>> 
>>         return sle;
>>       }
>>       
>>       private static long parseRfc3164Date(String in) {
>>             DateTime date = null;
>>             try {
>>               date = dateFmt1.parseDateTime(in);
>>             } catch (IllegalArgumentException e) {
>>               // ignore the exception, we act based on nullity of date object
>>               logger.debug("Date parse failed on ({}), trying single-digit 
>> date", in);
>>             }
>> 
>>             if (date == null) {
>>               try {
>>                 date = dateFmt2.parseDateTime(in);
>>               } catch (IllegalArgumentException e) {
>>                 // ignore the exception, we act based on nullity of date 
>> object
>>                 logger.debug("2nd date parse failed on ({}), unknown date 
>> format", in);
>>               }
>>             }
>> 
>>             // hacky stuff to try and deal with boundary cases, i.e. new 
>> year's eve.
>>             // rfc3164 dates are really dumb.
>>             // NB: cannot handle replaying of old logs or going back to the 
>> future
>>             if (date != null) {
>>               DateTime now = new DateTime();
>>               int year = now.getYear();
>>               DateTime corrected = date.withYear(year);
>> 
>>               // flume clock is ahead or there is some latency, and the year 
>> rolled
>>               if (corrected.isAfter(now) && 
>> corrected.minusMonths(1).isAfter(now)) {
>>                 corrected = date.withYear(year - 1);
>>               // flume clock is behind and the year rolled
>>               } else if (corrected.isBefore(now) && 
>> corrected.plusMonths(1).isBefore(now)) {
>>                 corrected = date.withYear(year + 1);
>>               }
>>               date = corrected;
>>             }
>> 
>>             if (date == null) {
>>               return 0;
>>             }
>> 
>>             return date.getMillis();
>>           }
>>       
>>       public static class Builder implements EventSerializer.Builder {
>> 
>>             @Override
>>             public EventSerializer build(Context context, OutputStream out) {
>>                 CustomLogAvroEventSerializer writer = null;
>>               try {
>>                 writer = new CustomLogAvroEventSerializer(out);
>>                 writer.configure(context);
>>               } catch (IOException e) {
>>                 logger.error("Unable to parse schema file. Exception 
>> follows.", e);
>>               }
>>               return writer;
>>             }
>> 
>>     }
>>       
>> 
>> 
>> }
>> 
>> Please suggest me i need output like this and i want to customize like log4j
>> -------------------------------------------------------------------------------------------------------------------------------------
>> 172  [main] FATAL com.cisco.flume.FlumeTest  - Sample fatal message
>> 188  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message1
>> 203  [main] WARN  com.cisco.flume.FlumeTest  - Sample warn message
>> 219  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message2
>> 219  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message3
>> 266  [main] ERROR com.cisco.flume.FlumeTest  - Sample error message
>> 282  [main] FATAL com.cisco.flume.FlumeTest  - Sample fatal message
>> 282  [main] INFO  com.cisco.flume.FlumeTest  - Sample info message4
>> 
>> 
>> -- 
>> JP
>> 
>> 
>> 
>> -- 
>> JP
> 
> 
> 
> 
> -- 
> JP

Reply via email to