Repository: incubator-samza Updated Branches: refs/heads/master 0da1090f2 -> d8fc65f73
SAMZA-310: Publish container logs to a SystemStream Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/d8fc65f7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/d8fc65f7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/d8fc65f7 Branch: refs/heads/master Commit: d8fc65f730cc5aa18af93057de7e4f5bea36e9bb Parents: 0da1090 Author: Yan Fang <[email protected]> Authored: Tue Nov 25 16:45:39 2014 -0800 Committer: Yan Fang <[email protected]> Committed: Tue Nov 25 16:45:39 2014 -0800 ---------------------------------------------------------------------- build.gradle | 2 + .../versioned/jobs/configuration-table.html | 14 ++ .../documentation/versioned/jobs/logging.md | 18 ++ .../apache/samza/container/SamzaContainer.scala | 28 ++- .../samza/container/TestSamzaContainer.scala | 17 -- .../apache/samza/config/Log4jSystemConfig.java | 105 +++++++++++ .../samza/logging/log4j/StreamAppender.java | 185 +++++++++++++++++++ .../samza/config/TestLog4jSystemConfig.java | 67 +++++++ .../samza/logging/log4j/MockSystemFactory.java | 45 +++++ .../samza/logging/log4j/MockSystemProducer.java | 53 ++++++ .../samza/logging/log4j/TestStreamAppender.java | 70 +++++++ .../apache/samza/job/yarn/SamzaAppMaster.scala | 5 + 12 files changed, 582 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index a9c4218..38383bd 100644 --- a/build.gradle +++ b/build.gradle @@ -175,6 +175,8 @@ project(':samza-log4j') { dependencies { compile "log4j:log4j:$log4jVersion" + compile project(':samza-api') + compile project(":samza-core_$scalaVersion") testCompile "junit:junit:$junitVersion" } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index fbb5ea4..12955ad 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -374,6 +374,20 @@ </tr> <tr> + <td class="property" id="task-log4j-system">task.log4j.system</td> + <td class="default"></td> + <td class="description"> + Specify the system name for the StreamAppender. If Samza can not find this property and there is + only one system in the config, it will guess to use that system for the log4j appender. If this property + is not specified and there are more than one system in the config, Samza throws exception. (See + <a href="logging.html#stream-log4j-appender">Stream Log4j Appender</a>) + <dl> + <dt>Example: <code>task.log4j.system=kafka</code></dt> + </dl> + </td> + </tr> + + <tr> <td class="property" id="task-poll-interval-ms">task.poll.interval.ms</td> <td class="default"></td> <td class="description"> http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/docs/learn/documentation/versioned/jobs/logging.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/logging.md b/docs/learn/documentation/versioned/jobs/logging.md index 58e56c1..af2fd0e 100644 --- a/docs/learn/documentation/versioned/jobs/logging.md +++ b/docs/learn/documentation/versioned/jobs/logging.md @@ -95,6 +95,24 @@ And then updating your log4j.xml to include the appender: <appender name="jmx" class="org.apache.samza.logging.log4j.JmxAppender" /> {% endhighlight %} +#### Stream Log4j Appender + +Samza provides a StreamAppender to publish the logs into a specific system. You can specify the system name using "task.log4j.system". If there is only one system in the config, Samza will use that system for the log publishing. Also we have the [MDC|http://logback.qos.ch/manual/mdc.html] keys "containerName", "jobName" and "jobId", which help identify the source of the log. In order to use this appender, simply add: + +{% highlight xml %} +<appender name="StreamAppender" class="org.apache.samza.logging.log4j.StreamAppender"> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%X{containerName} %X{jobName} %X{jobId} %d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" /> + </layout> +</appender> +{% endhighlight %} + +and add: + +{% highlight xml %} +<appender-ref ref="StreamAppender"/> +{% endhighlight %}. + ### Log Directory Samza will look for the `SAMZA_LOG_DIR` environment variable when it executes. If this variable is defined, all logs will be written to this directory. If the environment variable is empty, or not defined, then Samza will use `$base_dir`, which is the directory one level up from Samza's [run-class.sh](packaging.html) script. This environment variable can also be referenced inside log4j.xml files (see above). http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 2b53440..2f1568d 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -22,7 +22,7 @@ package org.apache.samza.container import java.io.File import org.apache.samza.Partition import org.apache.samza.SamzaException -import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager} +import org.apache.samza.checkpoint.{ CheckpointManagerFactory, OffsetManager } import org.apache.samza.config.Config import org.apache.samza.config.MetricsConfig.Config2Metrics import org.apache.samza.config.SerializerConfig.Config2Serializer @@ -63,26 +63,34 @@ import org.apache.samza.job.model.ContainerModel import org.apache.samza.coordinator.JobCoordinator import org.apache.samza.serializers.model.SamzaObjectMapper import org.apache.samza.job.model.JobModel +import org.apache.samza.config.JobConfig.Config2Job object SamzaContainer extends Logging { def main(args: Array[String]) { - safeMain() + safeMain(() => new JmxServer) } - def safeMain(jmxServer: JmxServer = new JmxServer) { + def safeMain(newJmxServer: () => JmxServer) { + putMDC("containerName", "samza-container-" + System.getenv(ShellCommandConfig.ENV_CONTAINER_ID)) // Break out the main method to make the JmxServer injectable so we can // validate that we don't leak JMX non-daemon threads if we have an // exception in the main method. - try { - val containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID).toInt - val coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL) - val jobModel = readJobModel(coordinatorUrl) - val containerModel = jobModel.getContainers()(containerId.toInt) - val config = jobModel.getConfig + val containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID).toInt + val coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL) + val jobModel = readJobModel(coordinatorUrl) + val containerModel = jobModel.getContainers()(containerId.toInt) + val config = jobModel.getConfig + putMDC("jobName", config.getName.getOrElse(throw new SamzaException("can not find the job name"))) + putMDC("jobId", config.getJobId.getOrElse("1")) + var jmxServer: JmxServer = null + try { + jmxServer = newJmxServer() SamzaContainer(containerModel, config).run } finally { - jmxServer.stop + if (jmxServer != null) { + jmxServer.stop + } } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index a0ea8b6..acded7d 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -76,23 +76,6 @@ class TestSamzaContainer extends AssertionsForJUnit { } @Test - def testJmxServerShutdownOnException { - var stopped = false - val jmxServer = new JmxServer { - override def stop { - super.stop - stopped = true - } - } - intercept[Exception] { - // Calling main will trigger an NPE since the container checks for an - // isCompressed environment variable, which isn't set. - SamzaContainer.safeMain(jmxServer) - } - assertTrue(stopped) - } - - @Test def testGetInputStreamMetadata { val inputStreams = Set( new SystemStreamPartition("test", "stream1", new Partition(0)), http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java new file mode 100644 index 0000000..5f5195c --- /dev/null +++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config; + +import java.util.ArrayList; +import java.util.Map; + +/** + * This class contains the methods for getting properties that are needed by the + * StreamAppender. + */ +public class Log4jSystemConfig { + + private static final String TASK_LOG4J_SYSTEM = "task.log4j.system"; + private static final String SYSTEM_PREFIX = "systems."; + private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory"; + private static final String EMPTY = ""; + private Config config = null; + + public Log4jSystemConfig(Config config) { + this.config = config; + } + + /** + * Get the log4j system name from the config. If it's not defined, try to + * guess the system name if there is only one system is defined. + * + * @return log4j system name + */ + public String getSystemName() { + String log4jSystem = getValue(TASK_LOG4J_SYSTEM); + if (log4jSystem == null) { + ArrayList<String> systemNames = getSystemNames(); + if (systemNames.size() == 1) { + log4jSystem = systemNames.get(0); + } else { + throw new ConfigException("Missing task.log4j.system configuration, and more than 1 systems were found."); + } + } + return log4jSystem; + } + + public String getJobName() { + return getValue(JobConfig.JOB_NAME()); + } + + public String getJobId() { + return getValue(JobConfig.JOB_ID()); + } + + public String getSystemFactory(String name) { + if (name == null) { + return null; + } + String systemFactory = String.format(SystemConfig.SYSTEM_FACTORY(), name); + return getValue(systemFactory); + } + + /** + * a helper method to get the value from the config. If the config does not + * contain the key, return null. + * + * @param key + * @return value of the key in the config + */ + protected String getValue(String key) { + if (config.containsKey(key)) { + return config.get(key); + } else { + return null; + } + } + + /** + * get a list of system names + */ + protected ArrayList<String> getSystemNames() { + Config subConf = config.subset(SYSTEM_PREFIX, true); + ArrayList<String> systemNames = new ArrayList<String>(); + for (Map.Entry<String, String> entry : subConf.entrySet()) { + String key = entry.getKey(); + if (key.endsWith(SYSTEM_FACTORY_SUFFIX)) { + systemNames.add(key.replace(SYSTEM_FACTORY_SUFFIX, EMPTY)); + } + } + return systemNames; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java new file mode 100644 index 0000000..9a9d648 --- /dev/null +++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.logging.log4j; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URL; + +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.Log4jSystemConfig; +import org.apache.samza.config.ShellCommandConfig; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.serializers.model.SamzaObjectMapper; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemProducer; +import org.apache.samza.system.SystemStream; +import org.apache.samza.util.Util; + +/** + * StreamAppender is a log4j appender that sends logs to the system which is + * specified by the user in the Samza config. It can send to any system as long + * as the system is defined appropriately in the config. + */ +public class StreamAppender extends AppenderSkeleton { + + private final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name"; + private final String APPLICATION_MASTER_TAG = "samza-application-master"; + private final String SOURCE = "log4j-log"; + private Config config = null; + private SystemStream systemStream = null; + private SystemProducer systemProducer = null; + private String key = null; + private String streamName = null; + private boolean isApplicationMaster = false; + private Logger log = Logger.getLogger(StreamAppender.class); + + /** + * used to detect if this thread is called recursively + */ + private final ThreadLocal<Boolean> recursiveCall = new ThreadLocal<Boolean>() { + @Override + protected Boolean initialValue() { + return false; + } + }; + + public String getStreamName() { + return this.streamName; + } + + public void setStreamName(String streamName) { + this.streamName = streamName; + } + + @Override + public void activateOptions() { + String containerName = System.getProperty(JAVA_OPTS_CONTAINER_NAME); + isApplicationMaster = containerName.contains(APPLICATION_MASTER_TAG); + key = containerName; // use the container name as the key for the logs + config = getConfig(); + SystemFactory systemFactory = null; + Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config); + + if (streamName == null) { + streamName = getStreamName(log4jSystemConfig.getJobName(), log4jSystemConfig.getJobId()); + } + + String systemName = log4jSystemConfig.getSystemName(); + String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName); + if (systemFactoryName != null) { + systemFactory = Util.<SystemFactory> getObj(systemFactoryName); + } else { + throw new SamzaException("Please define log4j system name and factory class"); + } + + systemProducer = systemFactory.getProducer(systemName, config, new MetricsRegistryMap()); + systemStream = new SystemStream(systemName, streamName); + systemProducer.register(SOURCE); + systemProducer.start(); + + log.info(SOURCE + " has been registered in " + systemName + ". So all the logs will be sent to " + streamName + + " in " + systemName + ". Logs are partitioned by " + key); + } + + @Override + protected void append(LoggingEvent event) { + if (!recursiveCall.get()) { + try { + recursiveCall.set(true); + OutgoingMessageEnvelope outgoingMessageEnvelope = + new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), subAppend(event).getBytes("UTF-8")); + systemProducer.send(SOURCE, outgoingMessageEnvelope); + } catch (UnsupportedEncodingException e) { + throw new SamzaException("can not send the log messages", e); + } finally { + recursiveCall.set(false); + } + } + } + + private String subAppend(LoggingEvent event) { + if (this.layout == null) { + return event.getRenderedMessage(); + } else { + return this.layout.format(event).trim(); + } + } + + @Override + public void close() { + if (!this.closed) { + this.closed = true; + flushSystemProducer(); + systemProducer.stop(); + } + } + + @Override + public boolean requiresLayout() { + return false; + } + + /** + * force the system producer to flush the messages + */ + public void flushSystemProducer() { + systemProducer.flush(SOURCE); + } + + /** + * get the config for the AM or containers based on the containers' names. + * + * @return Config the config of this container + */ + protected Config getConfig() { + Config config = null; + + try { + if (isApplicationMaster) { + config = SamzaObjectMapper.getObjectMapper().readValue(System.getenv(ShellCommandConfig.ENV_CONFIG()), Config.class); + } else { + String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL()); + config = SamzaObjectMapper.getObjectMapper().readValue(Util.read(new URL(url), 30000), JobModel.class).getConfig(); + } + } catch (IOException e) { + throw new SamzaException("can not read the config", e); + } + + return config; + } + + private String getStreamName(String jobName, String jobId) { + if (jobName == null) { + throw new SamzaException("job name is null. Please specify job.name"); + } + if (jobId == null) { + jobId = "1"; + } + String streamName = "__samza_" + jobName + "_" + jobId + "_logs"; + return streamName.replace("-", "_"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java new file mode 100644 index 0000000..64a1e70 --- /dev/null +++ b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config; + +import static org.junit.Assert.*; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class TestLog4jSystemConfig { + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void testGetSystemNamesAndGetValue() { + Map<String, String> map = new HashMap<String, String>(); + map.put("systems.system1.samza.factory","1"); + map.put("systems.system2.samza.factory","2"); + Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map)); + + assertEquals(2, log4jSystemConfig.getSystemNames().size()); + assertEquals("1", log4jSystemConfig.getValue("systems.system1.samza.factory")); + } + + @Test + public void testGetLog4jSystemName() { + Map<String, String> map = new HashMap<String, String>(); + map.put("task.log4j.system", "log4j-system"); + map.put("systems.system1.samza.factory","1"); + + Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map)); + assertEquals("log4j-system", log4jSystemConfig.getSystemName()); + + // use the default system name + map.remove("task.log4j.system"); + log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map)); + assertEquals("system1", log4jSystemConfig.getSystemName()); + + // throw ConfigException + map.put("systems.system2.samza.factory", "2"); + log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map)); + exception.expect(ConfigException.class); + log4jSystemConfig.getSystemName(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java new file mode 100644 index 0000000..cdc1245 --- /dev/null +++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.logging.log4j; + +import org.apache.samza.config.Config; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemProducer; + +public class MockSystemFactory implements SystemFactory { + + @Override + public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { + return null; + } + + @Override + public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { + return new MockSystemProducer(); + } + + @Override + public SystemAdmin getAdmin(String systemName, Config config) { + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java new file mode 100644 index 0000000..8d99094 --- /dev/null +++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.logging.log4j; + +import java.util.ArrayList; + +import org.apache.log4j.Logger; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemProducer; + +public class MockSystemProducer implements SystemProducer { + static public ArrayList<Object> messagesReceived = new ArrayList<Object>(); + static private Logger log = Logger.getLogger(MockSystemProducer.class); + + @Override + public void start() { + log.info("mock system producer is started..."); + } + + @Override + public void stop() { + } + + @Override + public void register(String source) { + } + + @Override + public void send(String source, OutgoingMessageEnvelope envelope) { + messagesReceived.add(envelope.getMessage()); + } + + @Override + public void flush(String source) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java new file mode 100644 index 0000000..46e4b8c --- /dev/null +++ b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.logging.log4j; + +import static org.junit.Assert.*; + +import java.util.HashMap; +import java.util.Map; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.junit.Test; + +public class TestStreamAppender { + + static Logger log = Logger.getLogger(TestStreamAppender.class); + + @Test + public void testSystemProducerAppender() { + System.setProperty("samza.container.name", "samza-container-1"); + + MockSystemProducerAppender systemProducerAppender = new MockSystemProducerAppender(); + PatternLayout layout = new PatternLayout(); + layout.setConversionPattern("%m"); + systemProducerAppender.setLayout(layout); + systemProducerAppender.activateOptions(); + log.addAppender(systemProducerAppender); + log.info("testing"); + log.info("testing2"); + + systemProducerAppender.flushSystemProducer(); + + assertEquals(2, MockSystemProducer.messagesReceived.size()); + assertEquals("testing", new String((byte[])MockSystemProducer.messagesReceived.get(0))); + assertEquals("testing2", new String((byte[])MockSystemProducer.messagesReceived.get(1))); + } + + /** + * a mock class which overrides the getConfig method in SystemProducerAppener + * for testing purpose. Because the environment variable where the config + * stays is difficult to test. + */ + class MockSystemProducerAppender extends StreamAppender { + @Override + protected Config getConfig() { + Map<String, String> map = new HashMap<String, String>(); + map.put("job.name", "log4jTest"); + map.put("systems.mock.samza.factory", MockSystemFactory.class.getCanonicalName()); + return new MapConfig(map); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala index 91aff3c..a1dbe04 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.samza.config.MapConfig import org.apache.samza.config.Config import org.apache.samza.config.ShellCommandConfig +import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.YarnConfig import org.apache.samza.config.YarnConfig.Config2Yarn import org.apache.samza.job.yarn.SamzaAppMasterTaskManager.DEFAULT_CONTAINER_MEM @@ -38,6 +39,7 @@ import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.util.hadoop.HttpFileSystem import org.apache.samza.util.Logging import org.apache.samza.serializers.model.SamzaObjectMapper +import org.apache.samza.SamzaException /** * When YARN executes an application master, it needs a bash command to @@ -56,6 +58,7 @@ object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler { var storedException: Throwable = null def main(args: Array[String]) { + putMDC("containerName", "samza-application-master") val containerIdStr = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString) info("got container id: %s" format containerIdStr) val containerId = ConverterUtils.toContainerId(containerIdStr) @@ -69,6 +72,8 @@ object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler { info("got node manager http port: %s" format nodeHttpPortString) val config = new MapConfig(SamzaObjectMapper.getObjectMapper.readValue(System.getenv(ShellCommandConfig.ENV_CONFIG), classOf[Config])) info("got config: %s" format config) + putMDC("jobName", config.getName.getOrElse(throw new SamzaException("can not find the job name"))) + putMDC("jobId", config.getJobId.getOrElse("1")) val hConfig = new YarnConfiguration hConfig.set("fs.http.impl", classOf[HttpFileSystem].getName) val interval = config.getAMPollIntervalMs.getOrElse(DEFAULT_POLL_INTERVAL_MS)
