HI,
Im getting errros
2012-08-02 16:58:50,065 INFO source.AvroSource: Avro source seqGenSrc
started.
2012-08-02 16:59:02,463 INFO ipc.NettyServer: [id: 0x45cbda0a, /
10.77.235.245:3770 => /10.105.39.202:41414] OPEN
2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /
10.77.235.245:3770 => /10.105.39.202:41414] BOUND: /10.105.39.202:41414
2012-08-02 16:59:02,466 INFO ipc.NettyServer: [id: 0x45cbda0a, /
10.77.235.245:3770 => /10.105.39.202:41414] CONNECTED: /10.77.235.245:3770
2012-08-02 16:59:04,006 INFO hdfs.BucketWriter: Creating
hdfs://slcso-poc4-lnx:8020/data/cssplogs/FlumeData.1343906943264.tmp
2012-08-02 16:59:04,167 ERROR serialization.EventSerializerFactory: Unable
to instantiate Builder from
org.apache.flume.serialization.CustomLogAvroEventSerializer
2012-08-02 16:59:04,168 WARN hdfs.HDFSEventSink: HDFS IO error
java.io.IOException: java.lang.NullPointerException
at
org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
at
org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
at
org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
at
org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
at
org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
at
org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
at
org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
at
org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NullPointerException
at
org.apache.flume.sink.hdfs.HDFSDataStream.open(HDFSDataStream.java:75)
at
org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:188)
... 13 more
2012-08-02 16:59:05,239 INFO hdfs.BucketWriter: Creating
hdfs://slcso-poc4-lnx:8020/data/cssplogs/FlumeData.1343906943265.tmp
2012-08-02 16:59:05,392 ERROR serialization.EventSerializerFactory: Unable
to instantiate Builder from
org.apache.flume.serialization.CustomLogAvroEventSerializer
2012-08-02 16:59:05,392 WARN hdfs.HDFSEventSink: HDFS IO error
java.io.IOException: java.lang.NullPointerException
at
org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:202)
at
org.apache.flume.sink.hdfs.BucketWriter.access$000(BucketWriter.java:48)
at
org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:155)
at
org.apache.flume.sink.hdfs.BucketWriter$1.run(BucketWriter.java:152)
at
org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:125)
at
org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:152)
at
org.apache.flume.sink.hdfs.BucketWriter.append(BucketWriter.java:307)
at
org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:717)
at
org.apache.flume.sink.hdfs.HDFSEventSink$1.call(HDFSEventSink.java:714)
at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
-----------------------------------------------------------------------------------------------------------------
This is my avro file
{ "type": "record", "name": "LogEvent", "namespace":
"org.apache.flume.serialization",
"fields": [
{ "name": "srno", "type": "int" },
{ "name": "severity", "type": "int" },
{ "name": "timestamp", "type": "long" },
{ "name": "hostname", "type": "string" },
{ "name": "message", "type": "string" }
]
}
------------------------------------------------------------------------------------------------
This is the LogEvent created using maven-avro and little customized
@SuppressWarnings("all")
public class LogEvent extends SpecificRecordBase implements SpecificRecord {
public static final Schema _SCHEMA =
Schema.parse("{\"type\":\"record\",\"name\":\"LogEvent\",\"namespace\":\"org.apache.flume.serialization\",\"fields\":[{\"name\":\"srno\",\"type\":\"int\"},{\"name\":\"severity\",\"type\":\"int\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"hostname\",\"type\":\"string\"},{\"name\":\"message\",\"type\":\"string\"}]}");
public int srno;
public String severity;
public long timestamp;
public String hostname;
public String message;
public Schema getSchema() { return _SCHEMA; }
public Object get(int _field) {
switch (_field) {
case 0: return srno;
case 1: return severity;
case 2: return timestamp;
case 3: return hostname;
case 4: return message;
default: throw new AvroRuntimeException("Bad index");
}
}
@SuppressWarnings(value="unchecked")
public void set(int _field, Object _value) {
switch (_field) {
case 0: srno = (Integer)_value; break;
case 1: severity = (String)_value; break;
case 2: timestamp = (Long)_value; break;
case 3: hostname = (String)_value; break;
case 4: message = (String)_value; break;
default: throw new AvroRuntimeException("Bad index");
}
}
public void setSrno(int srno) {
this.srno = srno;
}
public void setSeverity(String s) {
severity = s;
}
public String getSeverity() {
return severity;
}
public void setTimestamp(long t) {
timestamp = t;
}
public long getTimestamp() {
return timestamp;
}
public void setHostname(String h) {
hostname = h;
}
public String getHostname() {
return hostname;
}
public void setMessage(String m) {
message = m;
}
public String getMessage() {
return message;
}
@Override
public void put(int field, Object value) {
// TODO Auto-generated method stub
}
}
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
agent2.sources = seqGenSrc
agent2.channels = memoryChannel
agent2.sinks = loggerSink
agent2.sources.seqGenSrc.type = avro
agent2.sources.seqGenSrc.bind=slcso-poc2-lnx
agent2.sources.seqGenSrc.port=41414
#agent2.sources.seqGenSrc.interceptors = time hostInterceptor
#agent2.sources.seqGenSrc.interceptors.hostInterceptor.type =
org.apache.flume.interceptor.HostInterceptor$Builder
#agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader = host
#agent2.sources.seqGenSrc.interceptors.hostInterceptor.useIP = false
#agent2.sources.seqGenSrc.interceptors.hostInterceptor.hostHeader.preserveExisting
= false
#agent2.sources.seqGenSrc.interceptors.time.type =
org.apache.flume.interceptor.TimestampInterceptor$Builder
agent2.channels.memoryChannel.type = memory
agent2.channels.memoryChannel.capacity = 1000000
agent2.channels.memoryChannel.transactionCapacity = 1000000
agent2.channels.memoryChannel.keep-alive = 30
agent2.sources.seqGenSrc.channels = memoryChannel
agent2.sinks.loggerSink.type = hdfs
#agent2.sinks.loggerSink.hdfs.path = hdfs://10.105.39.204:8020/data/CspcLogs
agent2.sinks.loggerSink.hdfs.path = hdfs://slcso-poc4-lnx:8020/data/cssplogs
agent2.sinks.loggerSink.hdfs.fileType = DataStream
#agent2.sinks.loggerSink.hdfs.writeFormat = Text
agent2.sinks.loggerSink.channel = memoryChannel
#agent2.sinks.loggerSink.serializer =
org.apache.flume.serialization.BodyTextEventSerializer
#agent2.sinks.loggerSink.serializer = avro_event
agent2.sinks.loggerSink.serializer =
org.apache.flume.serialization.CustomLogAvroEventSerializer
agent2.sinks.loggerSink.serializer.compressionCodec = snappy
#agent2.sinks.loggerSink.serializer.syncIntervalBytes = 2048000
agent2.channels.memoryChannel.type = memory
~
-----------------------------------------------------------------------------------------------------------------
The following is my class
public class CustomLogAvroEventSerializer extends
AbstractAvroEventSerializer<LogEvent> {
private static final DateTimeFormatter dateFmt1 =
DateTimeFormat.forPattern("MMM dd HH:mm:ss").withZoneUTC();
private static final DateTimeFormatter dateFmt2 =
DateTimeFormat.forPattern("MMM d HH:mm:ss").withZoneUTC();
private static final Logger logger =
LoggerFactory.getLogger(CustomLogAvroEventSerializer.class);
private final OutputStream out;
private final Schema schema;
public CustomLogAvroEventSerializer(OutputStream out) throws
IOException {
this.out = out;
this.schema =new LogEvent().getSchema();;
}
@Override
protected OutputStream getOutputStream() {
return out;
}
@Override
protected Schema getSchema() {
return schema;
}
// very simple rfc3164 parser
@Override
protected LogEvent convert(Event event) {
LogEvent sle = new LogEvent();
// Stringify body so it's easy to parse.
// This is a pretty inefficient way to do it.
String msg = new String(event.getBody(), Charsets.UTF_8);
// parser read pointer
int seek = 0;
// Check Flume headers to see if we came from SyslogTcp(or UDP)
Source,
// which at the time of this writing only parses the priority.
// This is a bit schizophrenic and it should parse all the fields
or none.
Map<String, String> headers = event.getHeaders();
boolean fromSyslogSource = false;
if (headers.containsKey(SyslogUtils.SYSLOG_SRNO)) {
fromSyslogSource = true;
int srno = Integer.parseInt(headers.get("srno"));
sle.setSrno(srno);
}else{
sle.setSrno(121);
}
if (headers.containsKey(SyslogUtils.SYSLOG_SEVERITY)) {
fromSyslogSource = true;
String severity = headers.get(SyslogUtils.SYSLOG_SEVERITY);
sle.setSeverity(severity);
}
// assume the message was received raw (maybe via NetcatSource)
// parse the priority string
if (!fromSyslogSource) {
if (msg.charAt(0) == '<') {
int end = msg.indexOf(">");
if (end > -1) {
seek = end + 1;
String priStr = msg.substring(1, end);
// int priority = Integer.parseInt(priStr);
// String severity = priStr;
sle.setSeverity(priStr);
}
}
}
// parse the timestamp
String timestampStr = msg.substring(seek, seek + 15);
long ts = parseRfc3164Date(timestampStr);
if (ts != 0) {
sle.setTimestamp(ts);
seek += 15 + 1; // space after timestamp
}
// parse the hostname
int nextSpace = msg.indexOf(' ', seek);
if (nextSpace > -1) {
String hostname = msg.substring(seek, nextSpace);
sle.setHostname(hostname);
seek = nextSpace + 1;
}
// everything else is the message
String actualMessage = msg.substring(seek);
sle.setMessage(actualMessage);
logger.debug("Serialized event as: {}", sle);
return sle;
}
private static long parseRfc3164Date(String in) {
DateTime date = null;
try {
date = dateFmt1.parseDateTime(in);
} catch (IllegalArgumentException e) {
// ignore the exception, we act based on nullity of date
object
logger.debug("Date parse failed on ({}), trying single-digit
date", in);
}
if (date == null) {
try {
date = dateFmt2.parseDateTime(in);
} catch (IllegalArgumentException e) {
// ignore the exception, we act based on nullity of date
object
logger.debug("2nd date parse failed on ({}), unknown date
format", in);
}
}
// hacky stuff to try and deal with boundary cases, i.e. new
year's eve.
// rfc3164 dates are really dumb.
// NB: cannot handle replaying of old logs or going back to the
future
if (date != null) {
DateTime now = new DateTime();
int year = now.getYear();
DateTime corrected = date.withYear(year);
// flume clock is ahead or there is some latency, and the
year rolled
if (corrected.isAfter(now) &&
corrected.minusMonths(1).isAfter(now)) {
corrected = date.withYear(year - 1);
// flume clock is behind and the year rolled
} else if (corrected.isBefore(now) &&
corrected.plusMonths(1).isBefore(now)) {
corrected = date.withYear(year + 1);
}
date = corrected;
}
if (date == null) {
return 0;
}
return date.getMillis();
}
public static class Builder implements EventSerializer.Builder {
@Override
public EventSerializer build(Context context, OutputStream out)
{
CustomLogAvroEventSerializer writer = null;
try {
writer = new CustomLogAvroEventSerializer(out);
writer.configure(context);
} catch (IOException e) {
logger.error("Unable to parse schema file. Exception
follows.", e);
}
return writer;
}
}
}
Please suggest me i need output like this and i want to customize like log4j
-------------------------------------------------------------------------------------------------------------------------------------
172 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message
188 [main] INFO com.cisco.flume.FlumeTest - Sample info message1
203 [main] WARN com.cisco.flume.FlumeTest - Sample warn message
219 [main] INFO com.cisco.flume.FlumeTest - Sample info message2
219 [main] INFO com.cisco.flume.FlumeTest - Sample info message3
266 [main] ERROR com.cisco.flume.FlumeTest - Sample error message
282 [main] FATAL com.cisco.flume.FlumeTest - Sample fatal message
282 [main] INFO com.cisco.flume.FlumeTest - Sample info message4
--
JP