SAMZA-1096: StreamSpec constructors in the ExecutionEnvironments

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/674e5231
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/674e5231
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/674e5231

Branch: refs/heads/samza-fluent-api-v1
Commit: 674e5231bec56a00a274be46ea249cb5b3962f0e
Parents: ea37b74
Author: Jacob Maes <jm...@linkedin.com>
Authored: Thu Feb 23 10:57:32 2017 -0800
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Thu Feb 23 10:58:12 2017 -0800

----------------------------------------------------------------------
 .../samza/system/ExecutionEnvironment.java      |   9 +-
 .../org/apache/samza/system/StreamProvider.java |  78 +++++
 .../org/apache/samza/system/StreamSpec.java     |  22 +-
 .../system/AbstractExecutionEnvironment.java    |  67 ++++
 .../system/RemoteExecutionEnvironment.java      |   6 +-
 .../system/StandaloneExecutionEnvironment.java  |   6 +-
 .../org/apache/samza/config/JobConfig.scala     |  11 +
 .../org/apache/samza/config/StreamConfig.scala  |  73 ++++-
 .../TestAbstractExecutionEnvironment.java       | 308 +++++++++++++++++++
 .../org/apache/samza/config/KafkaConfig.scala   |   8 +-
 10 files changed, 567 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java 
b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
index ad37eb3..ef46626 100644
--- a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
+++ b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.system;
 
+import java.lang.reflect.Constructor;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.operators.StreamGraphBuilder;
@@ -28,7 +29,7 @@ import org.apache.samza.config.Config;
  * Interface to be implemented by physical execution engine to deploy the 
config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
  */
 @InterfaceStability.Unstable
-public interface ExecutionEnvironment {
+public interface ExecutionEnvironment extends StreamProvider {
 
   String ENVIRONMENT_CONFIG = "job.execution.environment.class";
   String DEFAULT_ENVIRONMENT_CLASS = 
"org.apache.samza.system.StandaloneExecutionEnvironment";
@@ -51,8 +52,10 @@ public interface ExecutionEnvironment {
    */
   static ExecutionEnvironment fromConfig(Config config) {
     try {
-      if 
(ExecutionEnvironment.class.isAssignableFrom(Class.forName(config.get(ENVIRONMENT_CONFIG,
 DEFAULT_ENVIRONMENT_CLASS)))) {
-        return (ExecutionEnvironment) 
Class.forName(config.get(ENVIRONMENT_CONFIG, 
DEFAULT_ENVIRONMENT_CLASS)).newInstance();
+      Class<?> environmentClass = Class.forName(config.get(ENVIRONMENT_CONFIG, 
DEFAULT_ENVIRONMENT_CLASS));
+      if (ExecutionEnvironment.class.isAssignableFrom(environmentClass)) {
+        Constructor<?> constructor = 
environmentClass.getConstructor(Config.class); // *sigh*
+        return (ExecutionEnvironment) constructor.newInstance(config);
       }
     } catch (Exception e) {
       throw new ConfigException(String.format("Problem in loading 
ExecutionEnvironment class %s", config.get(ENVIRONMENT_CONFIG)), e);

http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-api/src/main/java/org/apache/samza/system/StreamProvider.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/system/StreamProvider.java 
b/samza-api/src/main/java/org/apache/samza/system/StreamProvider.java
new file mode 100644
index 0000000..62c2ec4
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamProvider.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+/**
+ * Describes the common interface for classes that construct instances of 
{@link StreamSpec}.
+ */
+public interface StreamProvider {
+
+  /**
+   * Constructs a {@link StreamSpec} from the configuration for the specified 
streamId.
+   *
+   * The stream configurations are read from the following properties in the 
config:
+   * {@code streams.{$streamId}.*}
+   * <br>
+   * All properties matching this pattern are assumed to be system-specific 
with two exceptions. The following two
+   * properties are Samza properties which are used to bind the stream to a 
system and a physical resource on that system.
+   *
+   * <ul>
+   *   <li>system -       The name of the System on which this stream will be 
used. If this property isn't defined
+   *                      the stream will be associated with the System 
defined in {@code job.default.system}</li>
+   *   <li>physicalName - The system-specific name for this stream. It could 
be a file URN, topic name, or other identifer.</li>
+   * </ul>
+   *
+   * @param streamId  The logical identifier for the stream in Samza.
+   * @return          The {@link StreamSpec} instance.
+   */
+  StreamSpec streamFromConfig(String streamId);
+
+  /**
+   * Constructs a {@link StreamSpec} from the configuration for the specified 
streamId.
+   *
+   * The stream configurations are read from the following properties in the 
config:
+   * {@code streams.{$streamId}.*}
+   * <br>
+   * All properties matching this pattern are assumed to be system-specific 
with one exception. The following
+   * property is a Samza property which is used to bind the stream to a system.
+   *
+   * <ul>
+   *   <li>system -       The name of the System on which this stream will be 
used. If this property isn't defined
+   *                      the stream will be associated with the System 
defined in {@code job.default.system}</li>
+   * </ul>
+   *
+   * @param streamId      The logical identifier for the stream in Samza.
+   * @param physicalName  The system-specific name for this stream. It could 
be a file URN, topic name, or other identifer.
+   * @return              The {@link StreamSpec} instance.
+   */
+  StreamSpec streamFromConfig(String streamId, String physicalName);
+
+  /**
+   * Constructs a {@link StreamSpec} from the configuration for the specified 
streamId.
+   *
+   * The stream configurations are read from the following properties in the 
config:
+   * {@code streams.{$streamId}.*}
+   *
+   * @param streamId      The logical identifier for the stream in Samza.
+   * @param physicalName  The system-specific name for this stream. It could 
be a file URN, topic name, or other identifer.
+   * @param systemName    The name of the System on which this stream will be 
used.
+   * @return              The {@link StreamSpec} instance.
+   */
+  StreamSpec streamFromConfig(String streamId, String physicalName, String 
systemName);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java 
b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
index d8a2144..3bd0076 100644
--- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
+++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java
@@ -137,16 +137,11 @@ public class StreamSpec {
    * @param config          A map of properties for the stream. These may be 
System-specfic.
    */
   public StreamSpec(String id, String physicalName, String systemName, int 
partitionCount,  Map<String, String> config) {
-    if (id == null) {
-      throw new NullPointerException("Parameter 'id' must not be null");
-    }
-
-    if (systemName == null) {
-      throw new NullPointerException("Parameter 'systemName' must not be 
null");
-    }
+    validateLogicalIdentifier("id", id);
+    validateLogicalIdentifier("systemName", systemName);
 
     if (partitionCount < 1) {
-      throw new NullPointerException("Parameter 'partitionCount' must not be 
greater than 0");
+      throw new IllegalArgumentException("Parameter 'partitionCount' must be 
greater than 0");
     }
 
     this.id = id;
@@ -200,4 +195,15 @@ public class StreamSpec {
   public String getOrDefault(String propertyName, String defaultValue) {
     return config.getOrDefault(propertyName, defaultValue);
   }
+
+  private void validateLogicalIdentifier(String identifierName, String 
identifierValue) {
+    /*if (identifier == null) {
+      throw new NullPointerException();
+    } else if (identifier.isEmpty()) {
+
+    } else*/
+    if (!identifierValue.matches("[A-Za-z0-9_-]+")) {
+      throw new IllegalArgumentException(String.format("Identifier '%s' must 
match the expression [A-Za-z0-9_-]+", identifierName));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
 
b/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
new file mode 100644
index 0000000..c066bdd
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.StreamConfig;
+
+
+public abstract class AbstractExecutionEnvironment implements 
ExecutionEnvironment {
+
+  private final Config config;
+
+  public AbstractExecutionEnvironment(Config config) {
+    if (config == null) {
+      throw new NullPointerException();
+    }
+
+    this.config = config;
+  }
+
+  @Override
+  public StreamSpec streamFromConfig(String streamId) {
+    StreamConfig streamConfig = new StreamConfig(config);
+
+    String system = streamConfig.getSystem(streamId);
+    String physicalName = streamConfig.getPhysicalName(streamId, streamId);
+    Map<String, String> properties = 
streamConfig.getStreamProperties(streamId);
+
+    return new StreamSpec(streamId, physicalName, system, properties);
+  }
+
+  @Override
+  public StreamSpec streamFromConfig(String streamId, String physicalName) {
+    StreamConfig streamConfig = new StreamConfig(config);
+
+    String system = streamConfig.getSystem(streamId);
+    Map<String, String> properties = 
streamConfig.getStreamProperties(streamId);
+
+    return new StreamSpec(streamId, physicalName, system, properties);
+  }
+
+  @Override
+  public StreamSpec streamFromConfig(String streamId, String physicalName, 
String system) {
+    StreamConfig streamConfig = new StreamConfig(config);
+
+    Map<String, String> properties = 
streamConfig.getStreamProperties(streamId);
+
+    return new StreamSpec(streamId, physicalName, system, properties);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
 
b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
index 1dbc5f4..851c7f3 100644
--- 
a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
+++ 
b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
@@ -33,7 +33,11 @@ import org.slf4j.LoggerFactory;
 /**
  * This class implements the {@link ExecutionEnvironment} that runs the 
applications in YARN environment
  */
-public class RemoteExecutionEnvironment implements ExecutionEnvironment {
+public class RemoteExecutionEnvironment extends AbstractExecutionEnvironment {
+
+  public RemoteExecutionEnvironment(Config config) {
+    super(config);
+  }
   private static final Logger log = 
LoggerFactory.getLogger(RemoteExecutionEnvironment.class);
 
   @Override public void run(StreamGraphBuilder app, Config config) {

http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
 
b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
index f0f6ef2..71d60ef 100644
--- 
a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
+++ 
b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
@@ -28,7 +28,11 @@ import org.apache.samza.operators.StreamGraphImpl;
 /**
  * This class implements the {@link ExecutionEnvironment} that runs the 
applications in standalone environment
  */
-public class StandaloneExecutionEnvironment implements ExecutionEnvironment {
+public class StandaloneExecutionEnvironment extends 
AbstractExecutionEnvironment {
+
+  public StandaloneExecutionEnvironment(Config config) {
+    super(config);
+  }
 
   // TODO: may want to move this to a common base class for all {@link 
ExecutionEnvironment}
   StreamGraph createGraph(StreamGraphBuilder app, Config config) {

http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index a797ac2..6b1473c 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -43,6 +43,8 @@ object JobConfig {
   val SAMZA_FWK_PATH = "samza.fwk.path"
   val SAMZA_FWK_VERSION = "samza.fwk.version"
   val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
+  val JOB_METADATA_DEFAULT_SYSTEM = "job.metadata.system"
+  val JOB_DEFAULT_SYSTEM = "job.default.system"
   val JOB_CONTAINER_COUNT = "job.container.count"
   val jOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
   val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
@@ -108,6 +110,15 @@ class JobConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
   def getCoordinatorSystemName = 
getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse(
       throw new ConfigException("Missing job.coordinator.system configuration. 
Cannot proceed with job execution."))
 
+  def getMetadataSystemName = {
+    getOption(JobConfig.JOB_METADATA_DEFAULT_SYSTEM) match {
+      case Some(system) =>  Some(system)
+      case _ => getDefaultSystem
+    }
+  }
+
+  def getDefaultSystem = getOption(JobConfig.JOB_DEFAULT_SYSTEM)
+
   def getContainerCount = {
     getOption(JobConfig.JOB_CONTAINER_COUNT) match {
       case Some(count) => count.toInt

http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index 0ccc7df..c376681 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -19,13 +19,23 @@
 
 package org.apache.samza.config
 
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.system.SystemStream
 import org.apache.samza.util.Logging
+
 import scala.collection.JavaConversions._
-import org.apache.samza.system.SystemStream
 
 object StreamConfig {
   // stream config constants
   val STREAM_PREFIX = "systems.%s.streams.%s."
+
+  val SYSTEM = "system"
+  val PHYSICAL_NAME = "physicalName"
+  val STREAM_PREFIX_BY_ID = "streams.%s."
+  val SYSTEM_FOR_STREAM_ID = STREAM_PREFIX_BY_ID + SYSTEM
+  val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_PREFIX_BY_ID + PHYSICAL_NAME
+  val SAMZA_STREAM_PROPERTIES = Set(StreamConfig.SYSTEM, 
StreamConfig.PHYSICAL_NAME)
+
   val MSG_SERDE = STREAM_PREFIX + "samza.msg.serde"
   val KEY_SERDE = STREAM_PREFIX + "samza.key.serde"
   val CONSUMER_RESET_OFFSET = STREAM_PREFIX + "samza.reset.offset"
@@ -79,4 +89,65 @@ class StreamConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
         new SystemStream(systemName, streamName)
       }).toSet
   }
+
+  /**
+    * Gets the stream properties from the legacy config style:
+    * systems.{system}.streams.{streams}.*
+    *
+    * @param systemName the system name under which the properties are 
configured
+    * @param streamName the stream name
+    * @return           the map of properties for the stream
+    */
+  def getSystemStreamProperties(systemName: String, streamName: String) = {
+    config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), 
true)
+  }
+
+  /**
+    * Gets the properties for the specified streamId from the config.
+    * This method supercedes {@link StreamConfig.#getSystemStreamProperties}
+    * It first applies any legacy configs from this config location:
+    * systems.{system}.streams.{stream}.*
+    *
+    * It then overrides them with properties of the new config format:
+    * streams.{streamId}.*
+    *
+    * @param streamId the identifier for the stream in the config.
+    * @return         the merged map of config properties from both the legacy 
and new config styles
+    */
+  def getStreamProperties(streamId: String) = {
+    val properties = subset(StreamConfig.STREAM_PREFIX_BY_ID format streamId)
+    val inheritedLegacyProperties:java.util.Map[String, String] = 
getSystemStreamProperties(getSystem(streamId), streamId)
+    val filteredStreamProperties:java.util.Map[String, String] = 
properties.filterKeys(k => !StreamConfig.SAMZA_STREAM_PROPERTIES.contains(k))
+    new MapConfig(java.util.Arrays.asList(inheritedLegacyProperties, 
filteredStreamProperties))
+  }
+
+  /**
+    * Gets the System associated with the specified streamId.
+    * It first looks for the property
+    * streams.{streamId}.system
+    *
+    * If no value was provided, it uses
+    * job.default.system
+    *
+    * @param streamId the identifier for the stream in the config.
+    * @return         the system name associated with the stream.
+    */
+  def getSystem(streamId: String) = {
+    getOption(StreamConfig.SYSTEM_FOR_STREAM_ID format streamId) match {
+      case Some(system) => system
+      case _ => config.getDefaultSystem.getOrElse(throw new 
ConfigException("Missing %s configuration. Cannot bind stream to a system 
without it."
+        format(StreamConfig.SYSTEM_FOR_STREAM_ID format(streamId))))
+    }
+  }
+
+  /**
+    * Gets the physical name for the specified streamId.
+    *
+    * @param streamId             the identifier for the stream in the config.
+    * @param defaultPhysicalName  the default to use if the physical name is 
missing.
+    * @return                     the physical identifier for the stream or 
the default if it is undefined.
+    */
+  def getPhysicalName(streamId: String, defaultPhysicalName: String) = {
+    get(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID format streamId, 
defaultPhysicalName)
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java
 
b/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java
new file mode 100644
index 0000000..e547322
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+public class TestAbstractExecutionEnvironment {
+  private static final String STREAM_ID = "t3st-Stream_Id";
+  private static final String STREAM_ID_INVALID = "test#Str3amId!";
+
+  private static final String TEST_PHYSICAL_NAME = "t3st-Physical_Name";
+  private static final String TEST_PHYSICAL_NAME2 = "testPhysicalName2";
+  private static final String TEST_PHYSICAL_NAME_SPECIAL_CHARS = 
"test://Physical.Name?";
+
+  private static final String TEST_SYSTEM = "t3st-System_Name";
+  private static final String TEST_SYSTEM2 = "testSystemName2";
+  private static final String TEST_SYSTEM_INVALID = "test:System!Name@";
+
+  private static final String TEST_DEFAULT_SYSTEM = "testDefaultSystemName";
+
+
+  @Test(expected = NullPointerException.class)
+  public void testConfigValidation() {
+    new TestAbstractExecutionEnvironmentImpl(null);
+  }
+
+  // The physical name should be pulled from the StreamConfig.PHYSICAL_NAME 
property value.
+  @Test
+  public void testStreamFromConfigWithPhysicalNameInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    ExecutionEnvironment env = new 
TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
+  }
+
+  // The streamId should be used as the physicalName when the physical name is 
not specified.
+  // NOTE: its either this, set to null, or exception. This seems better for 
backward compatibility and API brevity.
+  @Test
+  public void testStreamFromConfigWithoutPhysicalNameInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    ExecutionEnvironment env = new 
TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(STREAM_ID, spec.getPhysicalName());
+  }
+
+  // If the system is specified at the stream scope, use it
+  @Test
+  public void testStreamFromConfigWithSystemAtStreamScopeInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    ExecutionEnvironment env = new 
TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // If system isn't specified at stream scope, use the default system
+  @Test
+  public void testStreamFromConfigWithSystemAtDefaultScopeInConfig() {
+    Config config = addConfigs(buildStreamConfig(STREAM_ID,
+                                                  
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME),
+                                JobConfig.JOB_DEFAULT_SYSTEM(), 
TEST_DEFAULT_SYSTEM);
+
+    ExecutionEnvironment env = new 
TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName());
+  }
+
+  // Stream scope should override default scope
+  @Test
+  public void testStreamFromConfigWithSystemAtBothScopesInConfig() {
+    Config config = addConfigs(buildStreamConfig(STREAM_ID,
+                                                StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME,
+                                                StreamConfig.SYSTEM(), 
TEST_SYSTEM),
+                                JobConfig.JOB_DEFAULT_SYSTEM(), 
TEST_DEFAULT_SYSTEM);
+
+    ExecutionEnvironment env = new 
TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // System is required. Throw if it cannot be determined.
+  @Test(expected = ConfigException.class)
+  public void testStreamFromConfigWithOutSystemInConfig() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME);
+
+    ExecutionEnvironment env = new 
TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+
+  }
+
+  // The properties in the config "streams.{streamId}.*" should be passed 
through to the spec.
+  @Test
+  public void testStreamFromConfigPropertiesPassthrough() {
+    Config config = buildStreamConfig(STREAM_ID,
+        StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+        StreamConfig.SYSTEM(), TEST_SYSTEM,
+        "systemProperty1", "systemValue1",
+        "systemProperty2", "systemValue2");
+
+    ExecutionEnvironment env = new 
TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    Map<String, String> properties = spec.getConfig();
+    assertEquals(2, properties.size());
+    assertEquals("systemValue1", properties.get("systemProperty1"));
+    assertEquals("systemValue2", properties.get("systemProperty2"));
+    assertEquals("systemValue1", spec.get("systemProperty1"));
+    assertEquals("systemValue2", spec.get("systemProperty2"));
+  }
+
+  // The samza properties (which are invalid for the underlying system) should 
be filtered out.
+  @Test
+  public void testStreamFromConfigSamzaPropertiesOmitted() {
+    Config config = buildStreamConfig(STREAM_ID,
+                              StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+                                    StreamConfig.SYSTEM(), TEST_SYSTEM,
+                                    "systemProperty1", "systemValue1",
+                                    "systemProperty2", "systemValue2");
+
+    ExecutionEnvironment env = new 
TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+    Map<String, String> properties = spec.getConfig();
+    assertEquals(2, properties.size());
+    
assertNull(properties.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(),
 STREAM_ID)));
+    
assertNull(properties.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), 
STREAM_ID)));
+    
assertNull(spec.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), 
STREAM_ID)));
+    assertNull(spec.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), 
STREAM_ID)));
+  }
+
+  // When the physicalName argument is passed explicitly it should be used, 
regardless of whether it is also in the config
+  @Test
+  public void testStreamFromConfigPhysicalNameArgSimple() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    ExecutionEnvironment env = new 
TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME);
+
+    assertEquals(STREAM_ID, spec.getId());
+    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // Special characters are allowed for the physical name
+  @Test
+  public void testStreamFromConfigPhysicalNameArgSpecialCharacters() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME2,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    ExecutionEnvironment env = new 
TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID, 
TEST_PHYSICAL_NAME_SPECIAL_CHARS);
+    assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName());
+  }
+
+  // Null is allowed for the physical name
+  @Test
+  public void testStreamFromConfigPhysicalNameArgNull() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME2,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    ExecutionEnvironment env = new 
TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID, null);
+    assertNull(spec.getPhysicalName());
+  }
+
+  // When the system name is provided explicitly, it should be used, 
regardless of whether it's also in the config
+  @Test
+  public void testStreamFromConfigSystemNameArgValid() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM2);    
          // This too
+
+    ExecutionEnvironment env = new 
TestAbstractExecutionEnvironmentImpl(config);
+    StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, 
TEST_SYSTEM);
+
+    assertEquals(STREAM_ID, spec.getId());
+    assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
+    assertEquals(TEST_SYSTEM, spec.getSystemName());
+  }
+
+  // Special characters are NOT allowed for system name, because it's used as 
an identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testStreamFromConfigSystemNameArgInvalid() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME2,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM2);
+
+    ExecutionEnvironment env = new 
TestAbstractExecutionEnvironmentImpl(config);
+    env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID);
+  }
+
+  // Null is not allowed for system name.
+  @Test(expected = NullPointerException.class)
+  public void testStreamFromConfigSystemNameArgNull() {
+    Config config = buildStreamConfig(STREAM_ID,
+                                      StreamConfig.PHYSICAL_NAME(), 
TEST_PHYSICAL_NAME2,
+                                      StreamConfig.SYSTEM(), TEST_SYSTEM2);
+
+    ExecutionEnvironment env = new 
TestAbstractExecutionEnvironmentImpl(config);
+    env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, null);
+  }
+
+  // Special characters are NOT allowed for streamId, because it's used as an 
identifier in the config.
+  @Test(expected = IllegalArgumentException.class)
+  public void testStreamFromConfigStreamIdInvalid() {
+    Config config = buildStreamConfig(STREAM_ID_INVALID,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    ExecutionEnvironment env = new 
TestAbstractExecutionEnvironmentImpl(config);
+    env.streamFromConfig(STREAM_ID_INVALID);
+  }
+
+  // Null is not allowed for streamId.
+  @Test(expected = NullPointerException.class)
+  public void testStreamFromConfigStreamIdNull() {
+    Config config = buildStreamConfig(null,
+        StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+    ExecutionEnvironment env = new 
TestAbstractExecutionEnvironmentImpl(config);
+    env.streamFromConfig(null);
+  }
+
+
+  // Helper methods
+
+  private Config buildStreamConfig(String streamId, String... kvs) {
+    // inject streams.x. into each key
+    for (int i = 0; i < kvs.length - 1; i += 2) {
+      kvs[i] = String.format(StreamConfig.STREAM_PREFIX_BY_ID(), streamId) + 
kvs[i];
+    }
+    return buildConfig(kvs);
+  }
+
+  private Config buildConfig(String... kvs) {
+    if (kvs.length % 2 != 0) {
+      throw new IllegalArgumentException("There must be parity between the 
keys and values");
+    }
+
+    Map<String, String> configMap = new HashMap<>();
+    for (int i = 0; i < kvs.length - 1; i += 2) {
+      configMap.put(kvs[i], kvs[i + 1]);
+    }
+    return new MapConfig(configMap);
+  }
+
+  private Config addConfigs(Config original, String... kvs) {
+    Map<String, String> result = new HashMap<>();
+    result.putAll(original);
+    result.putAll(buildConfig(kvs));
+    return new MapConfig(result);
+  }
+
+  private class TestAbstractExecutionEnvironmentImpl extends 
AbstractExecutionEnvironment {
+
+    public TestAbstractExecutionEnvironmentImpl(Config config) {
+      super(config);
+    }
+
+    @Override
+    public void run(StreamGraphBuilder graphBuilder, Config config) {
+      // do nothing
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index e355e7e..7e9f18a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -36,6 +36,7 @@ import java.util
 import scala.collection.JavaConverters._
 import org.apache.samza.system.kafka.KafkaSystemFactory
 import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.kafka.common.serialization.ByteArraySerializer
 
 object KafkaConfig {
@@ -165,13 +166,6 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
     kafkaChangeLogProperties
   }
 
-  def getTopicKafkaProperties(systemName: String, streamName: String) = {
-    val filteredConfigs = config.subset(StreamConfig.STREAM_PREFIX 
format(systemName, streamName), true)
-    val topicProperties = new Properties
-    filteredConfigs.foreach { kv => topicProperties.setProperty(kv._1, kv._2) }
-    topicProperties
-  }
-
   // kafka config
   def getKafkaSystemConsumerConfig( systemName: String,
                                     clientId: String,

Reply via email to