HI, Im getting errros
2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc started. 2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=> / localhost 41414 <http://10.105.39.202:41414>] OPEN 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost=> / localhost :41414 <http://10.105.39.202:41414>] BOUND: / localhost :41414<http://10.105.39.202:41414> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost :3770 <http://10.77.235.245:3770> => / localhost :41414<http://10.105.39.202:41414>] CONNECTED: / localhost :3770 <http://10.77.235.245:3770> 2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating hdfs:// localhost :8020/data/cssplogs/FlumeData.1343906943264.tmp 2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory: Unable to instantiate Builder from org.apache.flume.serialization.CustomLogAvroEventSerializer 2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error java.io.IOException: java.lang.NullPointerException at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202) at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48) at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155) at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152) at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125) at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152) at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307) at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717) at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) Caused by: java.lang.NullPointerException at org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75) at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188) ... 13 more 2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating hdfs:// localhost :8020/data/cssplogs/FlumeData.1343906943265.tmp 2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory: Unable to instantiate Builder from org.apache.flume.serialization.CustomLogAvroEventSerializer 2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error java.io.IOException: java.lang.NullPointerException at org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202) at org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48) at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155) at org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152) at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125) at org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152) at org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307) at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717) at org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) ----------------------------------------------------------------------------------------------------------------- This is my avro file { "type": "record", "name": "LogEvent", "namespace": "org.apache.flume.serialization", "fields": [ { "name": "srno", "type": "int" }, { "name": "severity", "type": "int" }, { "name": "timestamp", "type": "long" }, { "name": "hostname", "type": "string" }, { "name": "message", "type": "string" } ] } ------------------------------------------------------------------------------------------------ This is the LogEvent created using maven-avro and little customized @SuppressWarnings("all") public class LogEvent extends SpecificRecordBase implements SpecificRecord { public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}"); public int srno; public String severity; public long timestamp; public String hostname; public String message; public Schema getSchema() { return _SCHEMA; } public Object get(int _field) { switch (_field) { case 0: return srno; case 1: return severity; case 2: return timestamp; case 3: return hostname; case 4: return message; default: throw new AvroRuntimeException("Bad index"); } } @SuppressWarnings(value="unchecked") public void set(int _field, Object _value) { switch (_field) { case 0: srno = (Integer)_value; break; case 1: severity = (String)_value; break; case 2: timestamp = (Long)_value; break; case 3: hostname = (String)_value; break; case 4: message = (String)_value; break; default: throw new AvroRuntimeException("Bad index"); } } public void setSrno(int srno) { this.srno = srno; } public void setSeverity(String s) { severity = s; } public String getSeverity() { return severity; } public void setTimestamp(long t) { timestamp = t; } public long getTimestamp() { return timestamp; } public void setHostname(String h) { hostname = h; } public String getHostname() { return hostname; } public void setMessage(String m) { message = m; } public String getMessage() { return message; } @Override public void put(int field, Object value) { // TODO Auto-generated method stub } } ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- agent2.sources = seqGenSrc agent2.channels = memoryChannel agent2.sinks = loggerSink agent2.sources.seqGenSrc.type = avro agent2.sources.seqGenSrc.bind=slcso-poc2-lnx agent2.sources.seqGenSrc.port=41414 #agent2.sources.seqGenSrc.interceptors = time hostInterceptor #agent2.sources.seqGenSrc.interceptors.hostInterceptor.type = org.apache.flume.interceptor.HostInterceptor$Builder #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host #agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting = false #agent2.sources.seqGenSrc.interceptors.time.type = org.apache.flume.interceptor.TimestampInterceptor$Builder agent2.channels.memoryChannel.type = memory agent2.channels.memoryChannel.capacity = 1000000 agent2.channels.memoryChannel.transactionCapacity = 1000000 agent2.channels.memoryChannel.keep-alive = 30 agent2.sources.seqGenSrc.channels = memoryChannel agent2.sinks.loggerSink.type = hdfs #agent2.sinks.loggerSink.hdfs.path = hdfs://10.105.39.204:8020/data/CspcLogs agent2.sinks.loggerSink.hdfs.path = hdfs://slcso-poc4-lnx:8020/data/cssplogs agent2.sinks.loggerSink.hdfs.fileType = DataStream #agent2.sinks.loggerSink.hdfs.writeFormat = Text agent2.sinks.loggerSink.channel = memoryChannel #agent2.sinks.loggerSink.serializer = org.apache.flume.serialization.BodyTextEventSerializer #agent2.sinks.loggerSink.serializer = avro_event agent2.sinks.loggerSink.serializer = org.apache.flume.serialization.CustomLogAvroEventSerializer agent2.sinks.loggerSink.serializer.compressionCodec = snappy #agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000 agent2.channels.memoryChannel.type = memory ~ ----------------------------------------------------------------------------------------------------------------- The following is my class public class CustomLogAvroEventSerializer extends AbstractAvroEventSerializer<LogEvent> { private static final DateTimeFormatter dateFmt1 = DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC(); private static final DateTimeFormatter dateFmt2 = DateTimeFormat.forPattern("MMM d HH:mm:ss").withZoneUTC(); private static final Logger logger = LoggerFactory.getLogger(CustomLogAvroEventSerializer.class); private final OutputStream out; private final Schema schema; public CustomLogAvroEventSerializer(OutputStream out) throws IOException { this.out = out; this.schema =new LogEvent().getSchema();; } @Override protected OutputStream getOutputStream() { return out; } @Override protected Schema getSchema() { return schema; } // very simple rfc3164 parser @Override protected LogEvent convert(Event event) { LogEvent sle = new LogEvent(); // Stringify body so it's easy to parse. // This is a pretty inefficient way to do it. String msg = new String(event.getBody(), Charsets.UTF_8); // parser read pointer int seek = 0; // Check Flume headers to see if we came from SyslogTcp(or UDP) Source, // which at the time of this writing only parses the priority. // This is a bit schizophrenic and it should parse all the fields or none. Map<String, String> headers = event.getHeaders(); boolean fromSyslogSource = false; if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) { fromSyslogSource = true; int srno = Integer.parseInt(headers.get("srno")); sle.setSrno(srno); }else{ sle.setSrno(121); } if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) { fromSyslogSource = true; String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY); sle.setSeverity(severity); } // assume the message was received raw (maybe via NetcatSource) // parse the priority string if (!fromSyslogSource) { if (msg.charAt(0) == '<') { int end = msg.indexOf(">"); if (end > -1) { seek = end + 1; String priStr = msg.substring(1, end); // int priority = Integer.parseInt(priStr); // String severity = priStr; sle.setSeverity(priStr); } } } // parse the timestamp String timestampStr = msg.substring(seek, seek + 15); long ts = parseRfc3164Date(timestampStr); if (ts != 0) { sle.setTimestamp(ts); seek += 15 + 1; // space after timestamp } // parse the hostname int nextSpace = msg.indexOf(' ', seek); if (nextSpace > -1) { String hostname = msg.substring(seek, nextSpace); sle.setHostname(hostname); seek = nextSpace + 1; } // everything else is the message String actualMessage = msg.substring(seek); sle.setMessage(actualMessage); logger.debug("Serialized event as: {}", sle); return sle; } private static long parseRfc3164Date(String in) { DateTime date = null; try { date = dateFmt1.parseDateTime(in); } catch (IllegalArgumentException e) { // ignore the exception, we act based on nullity of date object logger.debug("Date parse failed on ({}), trying single-digit date", in); } if (date == null) { try { date = dateFmt2.parseDateTime(in); } catch (IllegalArgumentException e) { // ignore the exception, we act based on nullity of date object logger.debug("2nd date parse failed on ({}), unknown date format", in); } } // hacky stuff to try and deal with boundary cases, i.e. new year's eve. // rfc3164 dates are really dumb. // NB: cannot handle replaying of old logs or going back to the future if (date != null) { DateTime now = new DateTime(); int year = now.getYear(); DateTime corrected = date.withYear(year); // flume clock is ahead or there is some latency, and the year rolled if (corrected.isAfter(now) && corrected.minusMonths(1).isAfter(now)) { corrected = date.withYear(year - 1); // flume clock is behind and the year rolled } else if (corrected.isBefore(now) && corrected.plusMonths(1).isBefore(now)) { corrected = date.withYear(year + 1); } date = corrected; } if (date == null) { return 0; } return date.getMillis(); } public static class Builder implements EventSerializer.Builder { @Override public EventSerializer build(Context context, OutputStream out) { CustomLogAvroEventSerializer writer = null; try { writer = new CustomLogAvroEventSerializer(out); writer.configure(context); } catch (IOException e) { logger.error("Unable to parse schema file. Exception follows.", e); } return writer; } } } Please suggest me i need output like this and i want to customize like log4j ------------------------------------------------------------------------------------------------------------------------------------- 172 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message 188 [main] INFO com.cisco.flume.FlumeTest - Sample info message1 203 [main] WARN com.cisco.flume.FlumeTest - Sample warn message 219 [main] INFO com.cisco.flume.FlumeTest - Sample info message2 219 [main] INFO com.cisco.flume.FlumeTest - Sample info message3 266 [main] ERROR com.cisco.flume.FlumeTest - Sample error message 282 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message 282 [main] INFO com.cisco.flume.FlumeTest - Sample info message4 -- JP -- JP
