Thanks Mubarak it is working fine after adding agent2.sinks.loggerSink. serializer = org.apache.flume.serialization.CustomLogAvroEventSerializer$Builder
On Fri, Aug 3, 2012 at 9:23 AM, Mubarak Seyed <[email protected]> wrote: > can you please try with > > agent2.sinks.loggerSink.serializer = >> org.apache.flume.serialization.CustomLogAvroEventSerializer$Builder >> >> > On Aug 2, 2012, at 8:39 PM, JP wrote: > > Thanks Mubarak, > > I have added the jar to lib . > > After that only im getting the exception. > > Any help to resolve this issue. > > On Fri, Aug 3, 2012 at 9:05 AM, Mubarak Seyed <[email protected]> wrote: > >> Do you have a custom jar which contains >> org.apache.flume.serialization.CustomLogAvroEventSerializer in flume >> classpath? You can copy the custom jar file to <FLUME_HOME>/libdirectory. >> >> ERROR serialization.EventSerializerFactory: Unable to instantiate Builder >> from org.apache.flume.serialization.CustomLogAvroEventSerializer >> >> On Aug 2, 2012, at 8:26 PM, JP wrote: >> >> HI, >> >> Im getting errros >> >> 2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc >> started. >> 2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a, >> /localhost=> / localhost 41414 <http://10.105.39.202:41414/>] OPEN >> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, >> /localhost=> / localhost :41414 <http://10.105.39.202:41414/>] BOUND: / >> localhost :41414 <http://10.105.39.202:41414/> >> 2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /localhost >> :3770 <http://10.77.235.245:3770/> => / localhost >> :41414<http://10.105.39.202:41414/>] >> CONNECTED: / localhost :3770 <http://10.77.235.245:3770/> >> 2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating hdfs:// >> localhost :8020/data/cssplogs/FlumeData.1343906943264.tmp >> 2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory: >> Unable to instantiate Builder from >> org.apache.flume.serialization.CustomLogAvroEventSerializer >> 2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error >> java.io.IOException: java.lang.NullPointerException >> at >> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202) >> at >> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48) >> at >> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155) >> at >> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152) >> at >> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125) >> at >> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152) >> at >> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307) >> at >> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717) >> at >> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714) >> at >> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >> at java.lang.Thread.run(Thread.java:662) >> Caused by: java.lang.NullPointerException >> at >> org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75) >> at >> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188) >> ... 13 more >> 2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating hdfs:// >> localhost :8020/data/cssplogs/FlumeData.1343906943265.tmp >> 2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory: >> Unable to instantiate Builder from >> org.apache.flume.serialization.CustomLogAvroEventSerializer >> 2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error >> java.io.IOException: java.lang.NullPointerException >> at >> org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202) >> at >> org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48) >> at >> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155) >> at >> org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152) >> at >> org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125) >> at >> org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152) >> at >> org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307) >> at >> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717) >> at >> org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714) >> at >> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) >> at java.util.concurrent.FutureTask.run(FutureTask.java:138) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) >> at java.lang.Thread.run(Thread.java:662) >> >> >> ----------------------------------------------------------------------------------------------------------------- >> >> >> This is my avro file >> >> { "type": "record", "name": "LogEvent", "namespace": >> "org.apache.flume.serialization", >> "fields": [ >> { "name": "srno", "type": "int" }, >> { "name": "severity", "type": "int" }, >> { "name": "timestamp", "type": "long" }, >> { "name": "hostname", "type": "string" }, >> { "name": "message", "type": "string" } >> ] >> } >> >> >> ------------------------------------------------------------------------------------------------ >> >> This is the LogEvent created using maven-avro and little customized >> >> @SuppressWarnings("all") >> public class LogEvent extends SpecificRecordBase implements >> SpecificRecord { >> public static final Schema _SCHEMA = >> Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}"); >> public int srno; >> public String severity; >> public long timestamp; >> public String hostname; >> public String message; >> >> public Schema getSchema() { return _SCHEMA; } >> public Object get(int _field) { >> switch (_field) { >> case 0: return srno; >> case 1: return severity; >> case 2: return timestamp; >> case 3: return hostname; >> case 4: return message; >> default: throw new AvroRuntimeException("Bad index"); >> } >> } >> >> @SuppressWarnings(value="unchecked") >> public void set(int _field, Object _value) { >> switch (_field) { >> case 0: srno = (Integer)_value; break; >> case 1: severity = (String)_value; break; >> case 2: timestamp = (Long)_value; break; >> case 3: hostname = (String)_value; break; >> case 4: message = (String)_value; break; >> default: throw new AvroRuntimeException("Bad index"); >> } >> } >> >> public void setSrno(int srno) { >> this.srno = srno; >> } >> public void setSeverity(String s) { >> severity = s; >> } >> public String getSeverity() { >> return severity; >> } >> >> public void setTimestamp(long t) { >> timestamp = t; >> } >> >> public long getTimestamp() { >> return timestamp; >> } >> >> public void setHostname(String h) { >> hostname = h; >> } >> >> public String getHostname() { >> return hostname; >> } >> >> public void setMessage(String m) { >> message = m; >> } >> >> public String getMessage() { >> return message; >> } >> >> @Override >> public void put(int field, Object value) { >> // TODO Auto-generated method stub >> >> } >> } >> >> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------- >> agent2.sources = seqGenSrc >> agent2.channels = memoryChannel >> agent2.sinks = loggerSink >> >> >> agent2.sources.seqGenSrc.type = avro >> agent2.sources.seqGenSrc.bind=slcso-poc2-lnx >> agent2.sources.seqGenSrc.port=41414 >> >> #agent2.sources.seqGenSrc.interceptors = time hostInterceptor >> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.type = >> org.apache.flume.interceptor.HostInterceptor$Builder >> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host >> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false >> #agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting >> = false >> #agent2.sources.seqGenSrc.interceptors.time.type = >> org.apache.flume.interceptor.TimestampInterceptor$Builder >> >> agent2.channels.memoryChannel.type = memory >> agent2.channels.memoryChannel.capacity = 1000000 >> agent2.channels.memoryChannel.transactionCapacity = 1000000 >> agent2.channels.memoryChannel.keep-alive = 30 >> >> agent2.sources.seqGenSrc.channels = memoryChannel >> >> agent2.sinks.loggerSink.type = hdfs >> #agent2.sinks.loggerSink.hdfs.path = hdfs:// >> 10.105.39.204:8020/data/CspcLogs >> agent2.sinks.loggerSink.hdfs.path = >> hdfs://slcso-poc4-lnx:8020/data/cssplogs >> agent2.sinks.loggerSink.hdfs.fileType = DataStream >> #agent2.sinks.loggerSink.hdfs.writeFormat = Text >> >> agent2.sinks.loggerSink.channel = memoryChannel >> #agent2.sinks.loggerSink.serializer = >> org.apache.flume.serialization.BodyTextEventSerializer >> #agent2.sinks.loggerSink.serializer = avro_event >> agent2.sinks.loggerSink.serializer = >> org.apache.flume.serialization.CustomLogAvroEventSerializer >> agent2.sinks.loggerSink.serializer.compressionCodec = snappy >> #agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000 >> agent2.channels.memoryChannel.type = memory >> ~ >> >> >> >> ----------------------------------------------------------------------------------------------------------------- >> The following is my class >> >> public class CustomLogAvroEventSerializer extends >> AbstractAvroEventSerializer<LogEvent> { >> >> private static final DateTimeFormatter dateFmt1 = >> DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC(); >> >> private static final DateTimeFormatter dateFmt2 = >> DateTimeFormat.forPattern("MMM d HH:mm:ss").withZoneUTC(); >> >> >> private static final Logger logger = >> LoggerFactory.getLogger(CustomLogAvroEventSerializer.class); >> >> private final OutputStream out; >> private final Schema schema; >> >> public CustomLogAvroEventSerializer(OutputStream out) throws >> IOException { >> this.out = out; >> this.schema =new LogEvent().getSchema();; >> } >> >> @Override >> protected OutputStream getOutputStream() { >> return out; >> } >> >> @Override >> protected Schema getSchema() { >> return schema; >> } >> >> // very simple rfc3164 parser >> @Override >> protected LogEvent convert(Event event) { >> LogEvent sle = new LogEvent(); >> >> // Stringify body so it's easy to parse. >> // This is a pretty inefficient way to do it. >> String msg = new String(event.getBody(), Charsets.UTF_8); >> >> // parser read pointer >> int seek = 0; >> >> // Check Flume headers to see if we came from SyslogTcp(or UDP) >> Source, >> // which at the time of this writing only parses the priority. >> // This is a bit schizophrenic and it should parse all the fields >> or none. >> Map<String, String> headers = event.getHeaders(); >> boolean fromSyslogSource = false; >> if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) { >> fromSyslogSource = true; >> int srno = Integer.parseInt(headers.get("srno")); >> sle.setSrno(srno); >> }else{ >> sle.setSrno(121); >> } >> if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) { >> fromSyslogSource = true; >> String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY); >> sle.setSeverity(severity); >> } >> >> // assume the message was received raw (maybe via NetcatSource) >> // parse the priority string >> if (!fromSyslogSource) { >> if (msg.charAt(0) == '<') { >> int end = msg.indexOf(">"); >> if (end > -1) { >> seek = end + 1; >> String priStr = msg.substring(1, end); >> // int priority = Integer.parseInt(priStr); >> // String severity = priStr; >> >> sle.setSeverity(priStr); >> } >> } >> } >> >> // parse the timestamp >> String timestampStr = msg.substring(seek, seek + 15); >> long ts = parseRfc3164Date(timestampStr); >> if (ts != 0) { >> sle.setTimestamp(ts); >> seek += 15 + 1; // space after timestamp >> } >> >> // parse the hostname >> int nextSpace = msg.indexOf(' ', seek); >> if (nextSpace > -1) { >> String hostname = msg.substring(seek, nextSpace); >> sle.setHostname(hostname); >> seek = nextSpace + 1; >> } >> >> // everything else is the message >> String actualMessage = msg.substring(seek); >> sle.setMessage(actualMessage); >> >> logger.debug("Serialized event as: {}", sle); >> >> return sle; >> } >> >> private static long parseRfc3164Date(String in) { >> DateTime date = null; >> try { >> date = dateFmt1.parseDateTime(in); >> } catch (IllegalArgumentException e) { >> // ignore the exception, we act based on nullity of date >> object >> logger.debug("Date parse failed on ({}), trying >> single-digit date", in); >> } >> >> if (date == null) { >> try { >> date = dateFmt2.parseDateTime(in); >> } catch (IllegalArgumentException e) { >> // ignore the exception, we act based on nullity of date >> object >> logger.debug("2nd date parse failed on ({}), unknown date >> format", in); >> } >> } >> >> // hacky stuff to try and deal with boundary cases, i.e. new >> year's eve. >> // rfc3164 dates are really dumb. >> // NB: cannot handle replaying of old logs or going back to >> the future >> if (date != null) { >> DateTime now = new DateTime(); >> int year = now.getYear(); >> DateTime corrected = date.withYear(year); >> >> // flume clock is ahead or there is some latency, and the >> year rolled >> if (corrected.isAfter(now) && >> corrected.minusMonths(1).isAfter(now)) { >> corrected = date.withYear(year - 1); >> // flume clock is behind and the year rolled >> } else if (corrected.isBefore(now) && >> corrected.plusMonths(1).isBefore(now)) { >> corrected = date.withYear(year + 1); >> } >> date = corrected; >> } >> >> if (date == null) { >> return 0; >> } >> >> return date.getMillis(); >> } >> >> public static class Builder implements EventSerializer.Builder { >> >> @Override >> public EventSerializer build(Context context, OutputStream >> out) { >> CustomLogAvroEventSerializer writer = null; >> try { >> writer = new CustomLogAvroEventSerializer(out); >> writer.configure(context); >> } catch (IOException e) { >> logger.error("Unable to parse schema file. Exception >> follows.", e); >> } >> return writer; >> } >> >> } >> >> >> >> } >> >> Please suggest me i need output like this and i want to customize like >> log4j >> >> ------------------------------------------------------------------------------------------------------------------------------------- >> 172 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message >> 188 [main] INFO com.cisco.flume.FlumeTest - Sample info message1 >> 203 [main] WARN com.cisco.flume.FlumeTest - Sample warn message >> 219 [main] INFO com.cisco.flume.FlumeTest - Sample info message2 >> 219 [main] INFO com.cisco.flume.FlumeTest - Sample info message3 >> 266 [main] ERROR com.cisco.flume.FlumeTest - Sample error message >> 282 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message >> 282 [main] INFO com.cisco.flume.FlumeTest - Sample info message4 >> >> >> -- >> JP >> >> >> >> -- >> JP >> >> >> > > > -- > JP > > > -- JP
