Thanks Mubarak, I have added the jar to lib .
After that only im getting the exception. Any help to resolve this issue. On Fri, Aug 3, 2012 at 9:05 AM, Mubarak Seyed <[email protected]> wrote: > Do you have a custom jar which contains > org.apache.flume.serialization.CustomLogAvroEventSerializer in flume > classpath? You can copy the custom jar file to <FLUME_HOME>/lib directory. > > ERROR serialization.EventSerializerFactory: Unable to instantiate Builder > from org.apache.flume.serialization.CustomLogAvroEventSerializer > > On Aug 2, 2012, at 8:26 PM, JP wrote: > > HI, > > Im getting errros > > 2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc > started. > 2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a, > /localhost=> / localhost 41414 <http://10.105.39.202:41414/>] OPEN > 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, > /localhost=> / localhost :41414 <http://10.105.39.202:41414/>] BOUND: / > localhost :41414 <http://10.105.39.202:41414/> > 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost > :3770 <http://10.77.235.245:3770/> => / localhost > :41414<http://10.105.39.202:41414/>] > CONNECTED: / localhost :3770 <http://10.77.235.245:3770/> > 2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating hdfs:// > localhost :8020/data/cssplogs/FlumeData.1343906943264.tmp > 2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory: Unable > to instantiate Builder from > org.apache.flume.serialization.CustomLogAvroEventSerializer > 2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error > java.io.IOException: java.lang.NullPointerException > at > org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202) > at > org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48) > at > org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155) > at > org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152) > at > org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125) > at > org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152) > at > org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307) > at > org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717) > at > org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714) > at > java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > at java.util.concurrent.FutureTask.run(FutureTask.java:138) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:662) > Caused by: java.lang.NullPointerException > at > org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75) > at > org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188) > ... 13 more > 2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating hdfs:// > localhost :8020/data/cssplogs/FlumeData.1343906943265.tmp > 2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory: Unable > to instantiate Builder from > org.apache.flume.serialization.CustomLogAvroEventSerializer > 2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error > java.io.IOException: java.lang.NullPointerException > at > org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202) > at > org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48) > at > org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155) > at > org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152) > at > org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125) > at > org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152) > at > org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307) > at > org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717) > at > org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714) > at > java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) > at java.util.concurrent.FutureTask.run(FutureTask.java:138) > at > java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) > at java.lang.Thread.run(Thread.java:662) > > > ----------------------------------------------------------------------------------------------------------------- > > > This is my avro file > > { "type": "record", "name": "LogEvent", "namespace": > "org.apache.flume.serialization", > "fields": [ > { "name": "srno", "type": "int" }, > { "name": "severity", "type": "int" }, > { "name": "timestamp", "type": "long" }, > { "name": "hostname", "type": "string" }, > { "name": "message", "type": "string" } > ] > } > > > ------------------------------------------------------------------------------------------------ > > This is the LogEvent created using maven-avro and little customized > > @SuppressWarnings("all") > public class LogEvent extends SpecificRecordBase implements SpecificRecord > { > public static final Schema _SCHEMA = > Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}"); > public int srno; > public String severity; > public long timestamp; > public String hostname; > public String message; > > public Schema getSchema() { return _SCHEMA; } > public Object get(int _field) { > switch (_field) { > case 0: return srno; > case 1: return severity; > case 2: return timestamp; > case 3: return hostname; > case 4: return message; > default: throw new AvroRuntimeException("Bad index"); > } > } > > @SuppressWarnings(value="unchecked") > public void set(int _field, Object _value) { > switch (_field) { > case 0: srno = (Integer)_value; break; > case 1: severity = (String)_value; break; > case 2: timestamp = (Long)_value; break; > case 3: hostname = (String)_value; break; > case 4: message = (String)_value; break; > default: throw new AvroRuntimeException("Bad index"); > } > } > > public void setSrno(int srno) { > this.srno = srno; > } > public void setSeverity(String s) { > severity = s; > } > public String getSeverity() { > return severity; > } > > public void setTimestamp(long t) { > timestamp = t; > } > > public long getTimestamp() { > return timestamp; > } > > public void setHostname(String h) { > hostname = h; > } > > public String getHostname() { > return hostname; > } > > public void setMessage(String m) { > message = m; > } > > public String getMessage() { > return message; > } > > @Override > public void put(int field, Object value) { > // TODO Auto-generated method stub > > } > } > > ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- > agent2.sources = seqGenSrc > agent2.channels = memoryChannel > agent2.sinks = loggerSink > > > agent2.sources.seqGenSrc.type = avro > agent2.sources.seqGenSrc.bind=slcso-poc2-lnx > agent2.sources.seqGenSrc.port=41414 > > #agent2.sources.seqGenSrc.interceptors = time hostInterceptor > #agent2.sources.seqGenSrc.interceptors.hostInterceptor.type = > org.apache.flume.interceptor.HostInterceptor$Builder > #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host > #agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false > #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting > = false > #agent2.sources.seqGenSrc.interceptors.time.type = > org.apache.flume.interceptor.TimestampInterceptor$Builder > > agent2.channels.memoryChannel.type = memory > agent2.channels.memoryChannel.capacity = 1000000 > agent2.channels.memoryChannel.transactionCapacity = 1000000 > agent2.channels.memoryChannel.keep-alive = 30 > > agent2.sources.seqGenSrc.channels = memoryChannel > > agent2.sinks.loggerSink.type = hdfs > #agent2.sinks.loggerSink.hdfs.path = hdfs:// > 10.105.39.204:8020/data/CspcLogs > agent2.sinks.loggerSink.hdfs.path = > hdfs://slcso-poc4-lnx:8020/data/cssplogs > agent2.sinks.loggerSink.hdfs.fileType = DataStream > #agent2.sinks.loggerSink.hdfs.writeFormat = Text > > agent2.sinks.loggerSink.channel = memoryChannel > #agent2.sinks.loggerSink.serializer = > org.apache.flume.serialization.BodyTextEventSerializer > #agent2.sinks.loggerSink.serializer = avro_event > agent2.sinks.loggerSink.serializer = > org.apache.flume.serialization.CustomLogAvroEventSerializer > agent2.sinks.loggerSink.serializer.compressionCodec = snappy > #agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000 > agent2.channels.memoryChannel.type = memory > ~ > > > > ----------------------------------------------------------------------------------------------------------------- > The following is my class > > public class CustomLogAvroEventSerializer extends > AbstractAvroEventSerializer<LogEvent> { > > private static final DateTimeFormatter dateFmt1 = > DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC(); > > private static final DateTimeFormatter dateFmt2 = > DateTimeFormat.forPattern("MMM d HH:mm:ss").withZoneUTC(); > > > private static final Logger logger = > LoggerFactory.getLogger(CustomLogAvroEventSerializer.class); > > private final OutputStream out; > private final Schema schema; > > public CustomLogAvroEventSerializer(OutputStream out) throws > IOException { > this.out = out; > this.schema =new LogEvent().getSchema();; > } > > @Override > protected OutputStream getOutputStream() { > return out; > } > > @Override > protected Schema getSchema() { > return schema; > } > > // very simple rfc3164 parser > @Override > protected LogEvent convert(Event event) { > LogEvent sle = new LogEvent(); > > // Stringify body so it's easy to parse. > // This is a pretty inefficient way to do it. > String msg = new String(event.getBody(), Charsets.UTF_8); > > // parser read pointer > int seek = 0; > > // Check Flume headers to see if we came from SyslogTcp(or UDP) > Source, > // which at the time of this writing only parses the priority. > // This is a bit schizophrenic and it should parse all the fields > or none. > Map<String, String> headers = event.getHeaders(); > boolean fromSyslogSource = false; > if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) { > fromSyslogSource = true; > int srno = Integer.parseInt(headers.get("srno")); > sle.setSrno(srno); > }else{ > sle.setSrno(121); > } > if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) { > fromSyslogSource = true; > String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY); > sle.setSeverity(severity); > } > > // assume the message was received raw (maybe via NetcatSource) > // parse the priority string > if (!fromSyslogSource) { > if (msg.charAt(0) == '<') { > int end = msg.indexOf(">"); > if (end > -1) { > seek = end + 1; > String priStr = msg.substring(1, end); > // int priority = Integer.parseInt(priStr); > // String severity = priStr; > > sle.setSeverity(priStr); > } > } > } > > // parse the timestamp > String timestampStr = msg.substring(seek, seek + 15); > long ts = parseRfc3164Date(timestampStr); > if (ts != 0) { > sle.setTimestamp(ts); > seek += 15 + 1; // space after timestamp > } > > // parse the hostname > int nextSpace = msg.indexOf(' ', seek); > if (nextSpace > -1) { > String hostname = msg.substring(seek, nextSpace); > sle.setHostname(hostname); > seek = nextSpace + 1; > } > > // everything else is the message > String actualMessage = msg.substring(seek); > sle.setMessage(actualMessage); > > logger.debug("Serialized event as: {}", sle); > > return sle; > } > > private static long parseRfc3164Date(String in) { > DateTime date = null; > try { > date = dateFmt1.parseDateTime(in); > } catch (IllegalArgumentException e) { > // ignore the exception, we act based on nullity of date > object > logger.debug("Date parse failed on ({}), trying single-digit > date", in); > } > > if (date == null) { > try { > date = dateFmt2.parseDateTime(in); > } catch (IllegalArgumentException e) { > // ignore the exception, we act based on nullity of date > object > logger.debug("2nd date parse failed on ({}), unknown date > format", in); > } > } > > // hacky stuff to try and deal with boundary cases, i.e. new > year's eve. > // rfc3164 dates are really dumb. > // NB: cannot handle replaying of old logs or going back to > the future > if (date != null) { > DateTime now = new DateTime(); > int year = now.getYear(); > DateTime corrected = date.withYear(year); > > // flume clock is ahead or there is some latency, and the > year rolled > if (corrected.isAfter(now) && > corrected.minusMonths(1).isAfter(now)) { > corrected = date.withYear(year - 1); > // flume clock is behind and the year rolled > } else if (corrected.isBefore(now) && > corrected.plusMonths(1).isBefore(now)) { > corrected = date.withYear(year + 1); > } > date = corrected; > } > > if (date == null) { > return 0; > } > > return date.getMillis(); > } > > public static class Builder implements EventSerializer.Builder { > > @Override > public EventSerializer build(Context context, OutputStream > out) { > CustomLogAvroEventSerializer writer = null; > try { > writer = new CustomLogAvroEventSerializer(out); > writer.configure(context); > } catch (IOException e) { > logger.error("Unable to parse schema file. Exception > follows.", e); > } > return writer; > } > > } > > > > } > > Please suggest me i need output like this and i want to customize like > log4j > > ------------------------------------------------------------------------------------------------------------------------------------- > 172 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message > 188 [main] INFO com.cisco.flume.FlumeTest - Sample info message1 > 203 [main] WARN com.cisco.flume.FlumeTest - Sample warn message > 219 [main] INFO com.cisco.flume.FlumeTest - Sample info message2 > 219 [main] INFO com.cisco.flume.FlumeTest - Sample info message3 > 266 [main] ERROR com.cisco.flume.FlumeTest - Sample error message > 282 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message > 282 [main] INFO com.cisco.flume.FlumeTest - Sample info message4 > > > -- > JP > > > > -- > JP > > > -- JP
