Repository: samza
Updated Branches:
  refs/heads/master 1dfc5cecd -> 810d8bd80


SAMZA-1460: StreamAppender does not explicitly create logging topic

Creates the StreamAppender stream explicitly instead of relying on auto stream 
creation.

Author: Daniel Nishimura <dnishim...@gmail.com>

Reviewers: Prateek M <pmahe...@linkedin.com>

Closes #423 from dnishimura/samza-1460-streamappender-create-logging-topic


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/810d8bd8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/810d8bd8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/810d8bd8

Branch: refs/heads/master
Commit: 810d8bd805f386f79ce73efdee0d2ef341a65e83
Parents: 1dfc5ce
Author: Daniel Nishimura <dnishim...@gmail.com>
Authored: Mon Mar 5 16:02:49 2018 -0800
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Mon Mar 5 16:02:49 2018 -0800

----------------------------------------------------------------------
 .../documentation/versioned/jobs/logging.md     |  4 +-
 .../org/apache/samza/system/StreamSpec.java     |  7 ++
 .../samza/logging/log4j/StreamAppender.java     | 48 +++++++++++++
 .../samza/logging/log4j/MockSystemAdmin.java    | 74 ++++++++++++++++++++
 .../samza/logging/log4j/MockSystemFactory.java  |  2 +-
 .../samza/logging/log4j/TestStreamAppender.java | 35 +++++++++
 6 files changed, 168 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/810d8bd8/docs/learn/documentation/versioned/jobs/logging.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/logging.md 
b/docs/learn/documentation/versioned/jobs/logging.md
index 44eeb3c..ffb66dd 100644
--- a/docs/learn/documentation/versioned/jobs/logging.md
+++ b/docs/learn/documentation/versioned/jobs/logging.md
@@ -116,12 +116,14 @@ And then updating your log4j.xml to include the appender:
 
 #### Stream Log4j Appender
 
-Samza provides a StreamAppender to publish the logs into a specific system. 
You can specify the system name using "task.log4j.system" and change name of 
log stream with param 'StreamName'. Also, we have the 
[MDC](http://logback.qos.ch/manual/mdc.html) keys "containerName", "jobName" 
and "jobId", which help identify the source of the log. In order to use this 
appender, simply add:
+Samza provides a StreamAppender to publish the logs into a specific system. 
You can specify the system name using "task.log4j.system" and change name of 
log stream with param 'StreamName'. You can also specify the number of 
partitions for the log stream with param 'PartitionCount'; otherwise, the 
number of partitions will equal the number of containers configured for the 
job. The partition count is set upon the creation of the logging stream and 
changing the partition count requires manual intervention with the system 
stream. The [MDC](http://logback.qos.ch/manual/mdc.html) contains the keys 
"containerName", "jobName" and "jobId", which help identify the source of the 
log. In order to use this appender, add:
 
 {% highlight xml %}
 <appender name="StreamAppender" 
class="org.apache.samza.logging.log4j.StreamAppender">
    <!-- optional -->
    <param name="StreamName" value="EpicStreamName"/>
+   <!-- optional -->
+   <param name="PartitionCount" value="8"/>
    <layout class="org.apache.log4j.PatternLayout">
      <param name="ConversionPattern" value="%X{containerName} %X{jobName} 
%X{jobId} %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
    </layout>

http://git-wip-us.apache.org/repos/asf/samza/blob/810d8bd8/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java 
b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index 3a005c1..ce67d8d 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -46,6 +46,9 @@ public class StreamSpec {
   // Internal checkpoint stream id. It is used for creating checkpoint 
StreamSpec.
   private static final String CHECKPOINT_STREAM_ID = 
"samza-internal-checkpoint-stream-id";
 
+  // Internal stream appender stream id. It is used for creating stream 
appender StreamSpec.
+  private static final String STREAM_APPENDER_ID = 
"samza-internal-stream-appender-stream-id";
+
   /**
    * Unique identifier for the stream in a Samza application.
    * This identifier is used as a key for stream properties in the
@@ -289,4 +292,8 @@ public class StreamSpec {
   public static StreamSpec createCheckpointStreamSpec(String physicalName, 
String systemName) {
     return new StreamSpec(CHECKPOINT_STREAM_ID, physicalName, systemName, 1);
   }
+
+  public static StreamSpec createStreamAppenderStreamSpec(String physicalName, 
String systemName, int partitionCount) {
+    return new StreamSpec(STREAM_APPENDER_ID, physicalName, systemName, 
partitionCount);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/810d8bd8/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index 5f41959..9ea169d 100644
--- 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -32,6 +32,7 @@ import org.apache.log4j.Logger;
 import org.apache.log4j.spi.LoggingEvent;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.Log4jSystemConfig;
 import org.apache.samza.config.SerializerConfig;
 import org.apache.samza.config.ShellCommandConfig;
@@ -44,6 +45,8 @@ import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeFactory;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
@@ -70,6 +73,7 @@ public class StreamAppender extends AppenderSkeleton {
   private SystemProducer systemProducer = null;
   private String key = null;
   private String streamName = null;
+  private int partitionCount = 0;
   private boolean isApplicationMaster = false;
   private Serde<LoggingEvent> serde = null;
   private Logger log = Logger.getLogger(StreamAppender.class);
@@ -85,14 +89,48 @@ public class StreamAppender extends AppenderSkeleton {
    */
   private final AtomicBoolean recursiveCall = new AtomicBoolean(false);
 
+  /**
+   * Getter for the StreamName parameter. See also {@link #activateOptions()} 
for when this is called.
+   * Example: {@literal <param name="StreamName" value="ExampleStreamName"/>}
+   * @return The configured stream name.
+   */
   public String getStreamName() {
     return this.streamName;
   }
 
+  /**
+   * Setter for the StreamName parameter. See also {@link #activateOptions()} 
for when this is called.
+   * Example: {@literal <param name="StreamName" value="ExampleStreamName"/>}
+   * @param streamName The configured stream name.
+   */
   public void setStreamName(String streamName) {
     this.streamName = streamName;
   }
 
+  /**
+   * Getter for the number of partitions to create on a new StreamAppender 
stream. See also {@link #activateOptions()} for when this is called.
+   * Example: {@literal <param name="PartitionCount" value="4"/>}
+   * @return The configured partition count of the StreamAppender stream. If 
not set, returns {@link JobConfig#getContainerCount()}.
+   */
+  public int getPartitionCount() {
+    if (partitionCount > 0) {
+      return partitionCount;
+    }
+    return new JobConfig(getConfig()).getContainerCount();
+  }
+
+  /**
+   * Setter for the number of partitions to create on a new StreamAppender 
stream. See also {@link #activateOptions()} for when this is called.
+   * Example: {@literal <param name="PartitionCount" value="4"/>}
+   * @param partitionCount Configurable partition count.
+   */
+  public void setPartitionCount(int partitionCount) {
+    this.partitionCount = partitionCount;
+  }
+
+  /**
+   * Additional configurations needed before logging to stream. Called once in 
the container before the first log event is sent.
+   */
   @Override
   public void activateOptions() {
     String containerName = System.getProperty(JAVA_OPTS_CONTAINER_NAME);
@@ -260,6 +298,16 @@ public class StreamAppender extends AppenderSkeleton {
 
     setSerde(log4jSystemConfig, systemName, streamName);
 
+    // Explicitly create stream appender stream with the partition count the 
same as the number of containers.
+    System.out.println("[StreamAppender] creating stream " + streamName + " 
with partition count " + getPartitionCount());
+    StreamSpec streamSpec = 
StreamSpec.createStreamAppenderStreamSpec(streamName, systemName, 
getPartitionCount());
+
+    // SystemAdmin only needed for stream creation here.
+    SystemAdmin systemAdmin = systemFactory.getAdmin(systemName, config);
+    systemAdmin.start();
+    systemAdmin.createStream(streamSpec);
+    systemAdmin.stop();
+
     systemProducer = systemFactory.getProducer(systemName, config, 
metricsRegistry);
     systemStream = new SystemStream(systemName, streamName);
     systemProducer.register(SOURCE);

http://git-wip-us.apache.org/repos/asf/samza/blob/810d8bd8/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemAdmin.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemAdmin.java 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemAdmin.java
new file mode 100644
index 0000000..5c0e526
--- /dev/null
+++ 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemAdmin.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.StreamValidationException;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+public class MockSystemAdmin implements SystemAdmin {
+  public static String createdStreamName = "";
+
+  @Override
+  public void start() {
+
+  }
+
+  @Override
+  public void stop() {
+
+  }
+
+  @Override
+  public Map<SystemStreamPartition, String> 
getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
+    return null;
+  }
+
+  @Override
+  public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> 
streamNames) {
+    return null;
+  }
+
+  @Override
+  public Integer offsetComparator(String offset1, String offset2) {
+    return null;
+  }
+
+  @Override
+  public boolean createStream(StreamSpec streamSpec) {
+    createdStreamName = streamSpec.getPhysicalName();
+    return true;
+  }
+
+  @Override
+  public void validateStream(StreamSpec streamSpec) throws 
StreamValidationException {
+
+  }
+
+  @Override
+  public boolean clearStream(StreamSpec streamSpec) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/810d8bd8/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java
 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java
index cdc1245..1d7e782 100644
--- 
a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java
+++ 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/MockSystemFactory.java
@@ -40,6 +40,6 @@ public class MockSystemFactory implements SystemFactory {
 
   @Override
   public SystemAdmin getAdmin(String systemName, Config config) {
-    return null;
+    return new MockSystemAdmin();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/810d8bd8/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
index d93c5d1..1257835 100644
--- 
a/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
+++ 
b/samza-log4j/src/test/java/org/apache/samza/logging/log4j/TestStreamAppender.java
@@ -37,6 +37,7 @@ import 
org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerde;
 import org.apache.samza.logging.log4j.serializers.LoggingEventStringSerde;
 import 
org.apache.samza.logging.log4j.serializers.LoggingEventStringSerdeFactory;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestStreamAppender {
@@ -118,6 +119,40 @@ public class TestStreamAppender {
   }
 
   @Test
+  public void testStreamCreationUponSetup() {
+    System.setProperty("samza.container.name", "samza-container-1");
+
+    MockSystemProducerAppender systemProducerAppender = new 
MockSystemProducerAppender();
+    PatternLayout layout = new PatternLayout();
+    layout.setConversionPattern("%m");
+    systemProducerAppender.setLayout(layout);
+    systemProducerAppender.activateOptions();
+    log.addAppender(systemProducerAppender);
+
+    systemProducerAppender.setupSystem();
+    Assert.assertEquals("__samza_log4jTest_1_logs", 
MockSystemAdmin.createdStreamName);
+  }
+
+  @Test
+  public void testDefaultPartitionCount() {
+    MockSystemProducerAppender systemProducerAppender = new 
MockSystemProducerAppender();
+    Assert.assertEquals(1, systemProducerAppender.getPartitionCount()); // 
job.container.count defaults to 1
+
+    Map<String, String> map = new HashMap<>();
+    map.put("job.name", "log4jTest");
+    map.put("job.id", "1");
+    map.put("systems.mock.samza.factory", 
MockSystemFactory.class.getCanonicalName());
+    map.put("task.log4j.system", "mock");
+    map.put("job.container.count", "4");
+    systemProducerAppender = new MockSystemProducerAppender(new 
MapConfig(map));
+    Assert.assertEquals(4, systemProducerAppender.getPartitionCount());
+
+    systemProducerAppender = new MockSystemProducerAppender();
+    systemProducerAppender.setPartitionCount(8);
+    Assert.assertEquals(8, systemProducerAppender.getPartitionCount());
+  }
+
+  @Test
   public void testExceptionsDoNotKillTransferThread() throws 
InterruptedException {
     System.setProperty("samza.container.name", "samza-container-1");
 

Reply via email to