Repository: incubator-samza
Updated Branches:
  refs/heads/master 0da1090f2 -> d8fc65f73


SAMZA-310: Publish container logs to a SystemStream


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/d8fc65f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/d8fc65f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/d8fc65f7

Branch: refs/heads/master
Commit: d8fc65f730cc5aa18af93057de7e4f5bea36e9bb
Parents: 0da1090
Author: Yan Fang <[email protected]>
Authored: Tue Nov 25 16:45:39 2014 -0800
Committer: Yan Fang <[email protected]>
Committed: Tue Nov 25 16:45:39 2014 -0800

----------------------------------------------------------------------
 build.gradle                                    |   2 +
 .../versioned/jobs/configuration-table.html     |  14 ++
 .../documentation/versioned/jobs/logging.md     |  18 ++
 .../apache/samza/container/SamzaContainer.scala |  28 ++-
 .../samza/container/TestSamzaContainer.scala    |  17 --
 .../apache/samza/config/Log4jSystemConfig.java  | 105 +++++++++++
 .../samza/logging/log4j/StreamAppender.java     | 185 +++++++++++++++++++
 .../samza/config/TestLog4jSystemConfig.java     |  67 +++++++
 .../samza/logging/log4j/MockSystemFactory.java  |  45 +++++
 .../samza/logging/log4j/MockSystemProducer.java |  53 ++++++
 .../samza/logging/log4j/TestStreamAppender.java |  70 +++++++
 .../apache/samza/job/yarn/SamzaAppMaster.scala  |   5 +
 12 files changed, 582 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index a9c4218..38383bd 100644
--- a/build.gradle
+++ b/build.gradle
@@ -175,6 +175,8 @@ project(':samza-log4j') {
 
   dependencies {
     compile "log4j:log4j:$log4jVersion"
+    compile project(':samza-api')
+    compile project(":samza-core_$scalaVersion")
     testCompile "junit:junit:$junitVersion"
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index fbb5ea4..12955ad 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -374,6 +374,20 @@
                 </tr>
 
                 <tr>
+                    <td class="property" 
id="task-log4j-system">task.log4j.system</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        Specify the system name for the StreamAppender. If 
Samza can not find this property and there is
+                        only one system in the config, it will guess to use 
that system for the log4j appender. If this property
+                        is not specified and there are more than one system in 
the config, Samza throws exception. (See
+                        <a href="logging.html#stream-log4j-appender">Stream 
Log4j Appender</a>)
+                        <dl>
+                            <dt>Example: 
<code>task.log4j.system=kafka</code></dt>
+                        </dl>
+                    </td>
+                </tr>
+
+                <tr>
                     <td class="property" 
id="task-poll-interval-ms">task.poll.interval.ms</td>
                     <td class="default"></td>
                     <td class="description">

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/docs/learn/documentation/versioned/jobs/logging.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/logging.md 
b/docs/learn/documentation/versioned/jobs/logging.md
index 58e56c1..af2fd0e 100644
--- a/docs/learn/documentation/versioned/jobs/logging.md
+++ b/docs/learn/documentation/versioned/jobs/logging.md
@@ -95,6 +95,24 @@ And then updating your log4j.xml to include the appender:
 <appender name="jmx" class="org.apache.samza.logging.log4j.JmxAppender" />
 {% endhighlight %}
 
+#### Stream Log4j Appender
+
+Samza provides a StreamAppender to publish the logs into a specific system. 
You can specify the system name using "task.log4j.system". If there is only one 
system in the config, Samza will use that system for the log publishing. Also 
we have the [MDC|http://logback.qos.ch/manual/mdc.html] keys "containerName", 
"jobName" and "jobId", which help identify the source of the log. In order to 
use this appender, simply add:
+
+{% highlight xml %}
+<appender name="StreamAppender" 
class="org.apache.samza.logging.log4j.StreamAppender">
+   <layout class="org.apache.log4j.PatternLayout">
+     <param name="ConversionPattern" value="%X{containerName} %X{jobName} 
%X{jobId} %d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" />
+   </layout>
+</appender>
+{% endhighlight %}
+
+and add:
+
+{% highlight xml %}
+<appender-ref ref="StreamAppender"/>
+{% endhighlight %}.
+
 ### Log Directory
 
 Samza will look for the `SAMZA_LOG_DIR` environment variable when it executes. 
If this variable is defined, all logs will be written to this directory. If the 
environment variable is empty, or not defined, then Samza will use `$base_dir`, 
which is the directory one level up from Samza's [run-class.sh](packaging.html) 
script. This environment variable can also be referenced inside log4j.xml files 
(see above).

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 2b53440..2f1568d 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -22,7 +22,7 @@ package org.apache.samza.container
 import java.io.File
 import org.apache.samza.Partition
 import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager}
+import org.apache.samza.checkpoint.{ CheckpointManagerFactory, OffsetManager }
 import org.apache.samza.config.Config
 import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.SerializerConfig.Config2Serializer
@@ -63,26 +63,34 @@ import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.coordinator.JobCoordinator
 import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.job.model.JobModel
+import org.apache.samza.config.JobConfig.Config2Job
 
 object SamzaContainer extends Logging {
   def main(args: Array[String]) {
-    safeMain()
+    safeMain(() => new JmxServer)
   }
 
-  def safeMain(jmxServer: JmxServer = new JmxServer) {
+  def safeMain(newJmxServer: () => JmxServer) {
+    putMDC("containerName", "samza-container-" + 
System.getenv(ShellCommandConfig.ENV_CONTAINER_ID))
     // Break out the main method to make the JmxServer injectable so we can
     // validate that we don't leak JMX non-daemon threads if we have an
     // exception in the main method.
-    try {
-      val containerId = 
System.getenv(ShellCommandConfig.ENV_CONTAINER_ID).toInt
-      val coordinatorUrl = 
System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL)
-      val jobModel = readJobModel(coordinatorUrl)
-      val containerModel = jobModel.getContainers()(containerId.toInt)
-      val config = jobModel.getConfig
+    val containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID).toInt
+    val coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL)
+    val jobModel = readJobModel(coordinatorUrl)
+    val containerModel = jobModel.getContainers()(containerId.toInt)
+    val config = jobModel.getConfig
+    putMDC("jobName", config.getName.getOrElse(throw new SamzaException("can 
not find the job name")))
+    putMDC("jobId", config.getJobId.getOrElse("1"))
+    var jmxServer: JmxServer = null
 
+    try {
+      jmxServer = newJmxServer()
       SamzaContainer(containerModel, config).run
     } finally {
-      jmxServer.stop
+      if (jmxServer != null) {
+        jmxServer.stop
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index a0ea8b6..acded7d 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -76,23 +76,6 @@ class TestSamzaContainer extends AssertionsForJUnit {
   }
 
   @Test
-  def testJmxServerShutdownOnException {
-    var stopped = false
-    val jmxServer = new JmxServer {
-      override def stop {
-        super.stop
-        stopped = true
-      }
-    }
-    intercept[Exception] {
-      // Calling main will trigger an NPE since the container checks for an
-      // isCompressed environment variable, which isn't set.
-      SamzaContainer.safeMain(jmxServer)
-    }
-    assertTrue(stopped)
-  }
-
-  @Test
   def testGetInputStreamMetadata {
     val inputStreams = Set(
       new SystemStreamPartition("test", "stream1", new Partition(0)),

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java 
b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
new file mode 100644
index 0000000..5f5195c
--- /dev/null
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+/**
+ * This class contains the methods for getting properties that are needed by 
the
+ * StreamAppender.
+ */
+public class Log4jSystemConfig {
+
+  private static final String TASK_LOG4J_SYSTEM = "task.log4j.system";
+  private static final String SYSTEM_PREFIX = "systems.";
+  private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory";
+  private static final String EMPTY = "";
+  private Config config = null;
+
+  public Log4jSystemConfig(Config config) {
+    this.config = config;
+  }
+
+  /**
+   * Get the log4j system name from the config. If it's not defined, try to
+   * guess the system name if there is only one system is defined.
+   *
+   * @return log4j system name
+   */
+  public String getSystemName() {
+    String log4jSystem = getValue(TASK_LOG4J_SYSTEM);
+    if (log4jSystem == null) {
+      ArrayList<String> systemNames = getSystemNames();
+      if (systemNames.size() == 1) {
+        log4jSystem = systemNames.get(0);
+      } else {
+        throw new ConfigException("Missing task.log4j.system configuration, 
and more than 1 systems were found.");
+      }
+    }
+    return log4jSystem;
+  }
+
+  public String getJobName() {
+    return getValue(JobConfig.JOB_NAME());
+  }
+
+  public String getJobId() {
+    return getValue(JobConfig.JOB_ID());
+  }
+
+  public String getSystemFactory(String name) {
+    if (name == null) {
+      return null;
+    }
+    String systemFactory = String.format(SystemConfig.SYSTEM_FACTORY(), name);
+    return getValue(systemFactory);
+  }
+
+  /**
+   * a helper method to get the value from the config. If the config does not
+   * contain the key, return null.
+   *
+   * @param key
+   * @return value of the key in the config
+   */
+  protected String getValue(String key) {
+    if (config.containsKey(key)) {
+      return config.get(key);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * get a list of system names
+   */
+  protected ArrayList<String> getSystemNames() {
+    Config subConf = config.subset(SYSTEM_PREFIX, true);
+    ArrayList<String> systemNames = new ArrayList<String>();
+    for (Map.Entry<String, String> entry : subConf.entrySet()) {
+      String key = entry.getKey();
+      if (key.endsWith(SYSTEM_FACTORY_SUFFIX)) {
+        systemNames.add(key.replace(SYSTEM_FACTORY_SUFFIX, EMPTY));
+      }
+    }
+    return systemNames;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
new file mode 100644
index 0000000..9a9d648
--- /dev/null
+++ 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.Log4jSystemConfig;
+import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.Util;
+
+/**
+ * StreamAppender is a log4j appender that sends logs to the system which is
+ * specified by the user in the Samza config. It can send to any system as long
+ * as the system is defined appropriately in the config.
+ */
+public class StreamAppender extends AppenderSkeleton {
+
+  private final String JAVA_OPTS_CONTAINER_NAME = "samza.container.name";
+  private final String APPLICATION_MASTER_TAG = "samza-application-master";
+  private final String SOURCE = "log4j-log";
+  private Config config = null;
+  private SystemStream systemStream = null;
+  private SystemProducer systemProducer = null;
+  private String key = null;
+  private String streamName = null;
+  private boolean isApplicationMaster = false;
+  private Logger log = Logger.getLogger(StreamAppender.class);
+
+  /**
+   * used to detect if this thread is called recursively
+   */
+  private final ThreadLocal<Boolean> recursiveCall = new 
ThreadLocal<Boolean>() {
+    @Override
+    protected Boolean initialValue() {
+      return false;
+    }
+  };
+
+  public String getStreamName() {
+    return this.streamName;
+  }
+
+  public void setStreamName(String streamName) {
+    this.streamName = streamName;
+  }
+
+  @Override
+  public void activateOptions() {
+    String containerName = System.getProperty(JAVA_OPTS_CONTAINER_NAME);
+    isApplicationMaster = containerName.contains(APPLICATION_MASTER_TAG);
+    key = containerName; // use the container name as the key for the logs
+    config = getConfig();
+    SystemFactory systemFactory = null;
+    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config);
+
+    if (streamName == null) {
+      streamName = getStreamName(log4jSystemConfig.getJobName(), 
log4jSystemConfig.getJobId());
+    }
+
+    String systemName = log4jSystemConfig.getSystemName();
+    String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName);
+    if (systemFactoryName != null) {
+      systemFactory = Util.<SystemFactory> getObj(systemFactoryName);
+    } else {
+      throw new SamzaException("Please define log4j system name and factory 
class");
+    }
+
+    systemProducer = systemFactory.getProducer(systemName, config, new 
MetricsRegistryMap());
+    systemStream = new SystemStream(systemName, streamName);
+    systemProducer.register(SOURCE);
+    systemProducer.start();
+
+    log.info(SOURCE + " has been registered in " + systemName + ". So all the 
logs will be sent to " + streamName
+        + " in " + systemName + ". Logs are partitioned by " + key);
+  }
+
+  @Override
+  protected void append(LoggingEvent event) {
+    if (!recursiveCall.get()) {
+      try {
+        recursiveCall.set(true);
+        OutgoingMessageEnvelope outgoingMessageEnvelope =
+            new OutgoingMessageEnvelope(systemStream, key.getBytes("UTF-8"), 
subAppend(event).getBytes("UTF-8"));
+        systemProducer.send(SOURCE, outgoingMessageEnvelope);
+      } catch (UnsupportedEncodingException e) {
+        throw new SamzaException("can not send the log messages", e);
+      } finally {
+        recursiveCall.set(false);
+      }
+    }
+  }
+
+  private String subAppend(LoggingEvent event) {
+    if (this.layout == null) {
+      return event.getRenderedMessage();
+    } else {
+      return this.layout.format(event).trim();
+    }
+  }
+
+  @Override
+  public void close() {
+    if (!this.closed) {
+      this.closed = true;
+      flushSystemProducer();
+      systemProducer.stop();
+    }
+  }
+
+  @Override
+  public boolean requiresLayout() {
+    return false;
+  }
+
+  /**
+   * force the system producer to flush the messages
+   */
+  public void flushSystemProducer() {
+    systemProducer.flush(SOURCE);
+  }
+
+  /**
+   * get the config for the AM or containers based on the containers' names.
+   *
+   * @return Config the config of this container
+   */
+  protected Config getConfig() {
+    Config config = null;
+
+    try {
+      if (isApplicationMaster) {
+        config = 
SamzaObjectMapper.getObjectMapper().readValue(System.getenv(ShellCommandConfig.ENV_CONFIG()),
 Config.class);
+      } else {
+        String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
+        config = SamzaObjectMapper.getObjectMapper().readValue(Util.read(new 
URL(url), 30000), JobModel.class).getConfig();
+      }
+    } catch (IOException e) {
+      throw new SamzaException("can not read the config", e);
+    }
+
+    return config;
+  }
+
+  private String getStreamName(String jobName, String jobId) {
+    if (jobName == null) {
+      throw new SamzaException("job name is null. Please specify job.name");
+    }
+    if (jobId == null) {
+      jobId = "1";
+    }
+    String streamName = "__samza_" + jobName + "_" + jobId + "_logs";
+    return streamName.replace("-", "_");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java 
b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
new file mode 100644
index 0000000..64a1e70
--- /dev/null
+++ 
b/samza-log4j/src/test/java/org/apache/samza/config/TestLog4jSystemConfig.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class TestLog4jSystemConfig {
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void testGetSystemNamesAndGetValue() {
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("systems.system1.samza.factory","1");
+    map.put("systems.system2.samza.factory","2");
+    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new 
MapConfig(map));
+
+    assertEquals(2, log4jSystemConfig.getSystemNames().size());
+    assertEquals("1", 
log4jSystemConfig.getValue("systems.system1.samza.factory"));
+  }
+
+  @Test
+  public void testGetLog4jSystemName() {
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("task.log4j.system", "log4j-system");
+    map.put("systems.system1.samza.factory","1");
+
+    Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(new 
MapConfig(map));
+    assertEquals("log4j-system", log4jSystemConfig.getSystemName());
+
+    // use the default system name
+    map.remove("task.log4j.system");
+    log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map));
+    assertEquals("system1", log4jSystemConfig.getSystemName());
+
+    // throw ConfigException
+    map.put("systems.system2.samza.factory", "2");
+    log4jSystemConfig = new Log4jSystemConfig(new MapConfig(map));
+    exception.expect(ConfigException.class);
+    log4jSystemConfig.getSystemName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java
 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java
new file mode 100644
index 0000000..cdc1245
--- /dev/null
+++ 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+
+public class MockSystemFactory implements SystemFactory {
+
+  @Override
+  public SystemConsumer getConsumer(String systemName, Config config, 
MetricsRegistry registry) {
+    return null;
+  }
+
+  @Override
+  public SystemProducer getProducer(String systemName, Config config, 
MetricsRegistry registry) {
+    return new MockSystemProducer();
+  }
+
+  @Override
+  public SystemAdmin getAdmin(String systemName, Config config) {
+    return null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java
 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java
new file mode 100644
index 0000000..8d99094
--- /dev/null
+++ 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemProducer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j;
+
+import java.util.ArrayList;
+
+import org.apache.log4j.Logger;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+
+public class MockSystemProducer implements SystemProducer {
+  static public ArrayList<Object> messagesReceived = new ArrayList<Object>();
+  static private Logger log = Logger.getLogger(MockSystemProducer.class);
+
+  @Override
+  public void start() {
+    log.info("mock system producer is started...");
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  @Override
+  public void register(String source) {
+  }
+
+  @Override
+  public void send(String source, OutgoingMessageEnvelope envelope) {
+    messagesReceived.add(envelope.getMessage());
+  }
+
+  @Override
+  public void flush(String source) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
new file mode 100644
index 0000000..46e4b8c
--- /dev/null
+++ 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.junit.Test;
+
+public class TestStreamAppender {
+
+  static Logger log = Logger.getLogger(TestStreamAppender.class);
+
+  @Test
+  public void testSystemProducerAppender() {
+    System.setProperty("samza.container.name", "samza-container-1");
+
+    MockSystemProducerAppender systemProducerAppender = new 
MockSystemProducerAppender();
+    PatternLayout layout = new PatternLayout();
+    layout.setConversionPattern("%m");
+    systemProducerAppender.setLayout(layout);
+    systemProducerAppender.activateOptions();
+    log.addAppender(systemProducerAppender);
+    log.info("testing");
+    log.info("testing2");
+
+    systemProducerAppender.flushSystemProducer();
+
+    assertEquals(2, MockSystemProducer.messagesReceived.size());
+    assertEquals("testing", new 
String((byte[])MockSystemProducer.messagesReceived.get(0)));
+    assertEquals("testing2", new 
String((byte[])MockSystemProducer.messagesReceived.get(1)));
+  }
+
+  /**
+   * a mock class which overrides the getConfig method in SystemProducerAppener
+   * for testing purpose. Because the environment variable where the config
+   * stays is difficult to test.
+   */
+  class MockSystemProducerAppender extends StreamAppender {
+    @Override
+    protected Config getConfig() {
+      Map<String, String> map = new HashMap<String, String>();
+      map.put("job.name", "log4jTest");
+      map.put("systems.mock.samza.factory", 
MockSystemFactory.class.getCanonicalName());
+      return new MapConfig(map);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d8fc65f7/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
index 91aff3c..a1dbe04 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.samza.config.MapConfig
 import org.apache.samza.config.Config
 import org.apache.samza.config.ShellCommandConfig
+import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.YarnConfig
 import org.apache.samza.config.YarnConfig.Config2Yarn
 import 
org.apache.samza.job.yarn.SamzaAppMasterTaskManager.DEFAULT_CONTAINER_MEM
@@ -38,6 +39,7 @@ import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.util.hadoop.HttpFileSystem
 import org.apache.samza.util.Logging
 import org.apache.samza.serializers.model.SamzaObjectMapper
+import org.apache.samza.SamzaException
 
 /**
  * When YARN executes an application master, it needs a bash command to
@@ -56,6 +58,7 @@ object SamzaAppMaster extends Logging with 
AMRMClientAsync.CallbackHandler {
   var storedException: Throwable = null
 
   def main(args: Array[String]) {
+    putMDC("containerName", "samza-application-master")
     val containerIdStr = 
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.toString)
     info("got container id: %s" format containerIdStr)
     val containerId = ConverterUtils.toContainerId(containerIdStr)
@@ -69,6 +72,8 @@ object SamzaAppMaster extends Logging with 
AMRMClientAsync.CallbackHandler {
     info("got node manager http port: %s" format nodeHttpPortString)
     val config = new 
MapConfig(SamzaObjectMapper.getObjectMapper.readValue(System.getenv(ShellCommandConfig.ENV_CONFIG),
 classOf[Config]))
     info("got config: %s" format config)
+    putMDC("jobName", config.getName.getOrElse(throw new SamzaException("can 
not find the job name")))
+    putMDC("jobId", config.getJobId.getOrElse("1"))
     val hConfig = new YarnConfiguration
     hConfig.set("fs.http.impl", classOf[HttpFileSystem].getName)
     val interval = 
config.getAMPollIntervalMs.getOrElse(DEFAULT_POLL_INTERVAL_MS)

Reply via email to