diff --git contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/JobHistoryAppender.java contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/JobHistoryAppender.java index 7f9769e..b421f24 100644 --- contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/JobHistoryAppender.java +++ contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/JobHistoryAppender.java @@ -36,6 +36,8 @@ import org.apache.log4j.spi.LoggingEvent; public class JobHistoryAppender extends AppenderSkeleton implements Appender { private static final Log LOG = LogFactory.getLog(JobHistoryAppender.class); + + public static final int QUEUE_CAPACITY = 256; private final Queue events; private LoggingThreadRunnable logThreadRunnable; @@ -63,7 +65,7 @@ public class JobHistoryAppender extends AppenderSkeleton implements Appender { private LogStore logStore; public JobHistoryAppender() { - events = new LinkedBlockingQueue(); + events = new LinkedBlockingQueue(QUEUE_CAPACITY); logParser = new MapReduceJobHistoryParser(); logStore = nullStore; } @@ -116,30 +118,27 @@ public class JobHistoryAppender extends AppenderSkeleton implements Appender { logStore = new DatabaseStore(driver, database, user, password, new MapReduceJobHistoryUpdater()); + logThreadRunnable = + new LoggingThreadRunnable(events, logParser, logStore); + logThread = new Thread(logThreadRunnable); + logThread.setDaemon(true); + logThread.start(); } catch (IOException ioe) { - LOG.debug("Failed to connect to db " + database, ioe); + LOG.warn("Failed to connect to db " + database, ioe); System.err.println("Failed to connect to db " + database + " as user " + user + " password " + password + " and driver " + driver + " with " + StringUtils.stringifyException(ioe)); - throw new RuntimeException( - "Failed to create database store for " + database, ioe); } catch (Exception e) { - LOG.debug("Failed to connect to db " + database, e); + LOG.warn("Failed to connect to db " + database + + " as user " + user + " password " + password + + " and driver " + driver, e); System.err.println("Failed to connect to db " + database + " as user " + user + " password " + password + " and driver " + driver + " with " + StringUtils.stringifyException(e)); - throw new RuntimeException( - "Failed to create database store for " + database, e); } } - logThreadRunnable = - new LoggingThreadRunnable(events, logParser, logStore); - logThread = new Thread(logThreadRunnable); - logThread.setDaemon(true); - logThread.start(); - super.activateOptions(); } } @@ -150,11 +149,15 @@ public class JobHistoryAppender extends AppenderSkeleton implements Appender { logThreadRunnable.close(); } catch (IOException ioe) { LOG.info("Failed to close logThreadRunnable", ioe); + } catch (Exception e) { + LOG.info("Failed to close logThreadRunnable", e); } try { logThread.join(1000); } catch (InterruptedException ie) { LOG.info("logThread interrupted", ie); + } catch (Exception e) { + LOG.info("logThread interrupted", e); } } @@ -164,7 +167,14 @@ public class JobHistoryAppender extends AppenderSkeleton implements Appender { } @Override - protected void append(LoggingEvent event) { - events.add(event); + public void append(LoggingEvent event) { + if (!events.offer(event)) { + //fail, queue is full, there is a chance database is down + LOG.warn("workflow event queue full, there is a chance database is down"); + } + } + + public Queue getEventQueue() { + return events; } } diff --git contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryAppender.java contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryAppender.java new file mode 100644 index 0000000..55167a7 --- /dev/null +++ contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryAppender.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari; + +import junit.framework.Assert; +import junit.framework.TestCase; +import org.apache.ambari.log4j.hadoop.mapreduce.jobhistory.JobHistoryAppender; +import org.apache.log4j.Category; +import org.apache.log4j.spi.LoggingEvent; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +/** + * + */ +public class TestJobHistoryAppender extends TestCase { + + public void testEventQueueCapacity() throws IllegalAccessException, + InstantiationException, InvocationTargetException, NoSuchMethodException { + JobHistoryAppender jobHistoryAppender = new JobHistoryAppender(); + Constructor constructor = Category.class.getDeclaredConstructor(String.class); + constructor.setAccessible(true); + Category c = (Category) constructor.newInstance("test"); + for (int i = 0; i < JobHistoryAppender.QUEUE_CAPACITY; i++) { + LoggingEvent event = newLoggingEvent(c); + jobHistoryAppender.append(event); + } + Assert.assertTrue(jobHistoryAppender.getEventQueue().size() == + JobHistoryAppender.QUEUE_CAPACITY); + LoggingEvent event = newLoggingEvent(c); + try { + jobHistoryAppender.append(event); + } catch (Exception e) { + fail("jobHistoryAppender should not throw an exception \n" + e); + } + Assert.assertTrue(jobHistoryAppender.getEventQueue().size() == + JobHistoryAppender.QUEUE_CAPACITY); + } + + private LoggingEvent newLoggingEvent(Category c) throws IllegalAccessException, + InvocationTargetException, InstantiationException { + return new LoggingEvent(null, c, null, null, null); + } + +}