SAMZA-1096: StreamSpec constructors in the ExecutionEnvironments
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/674e5231 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/674e5231 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/674e5231 Branch: refs/heads/samza-fluent-api-v1 Commit: 674e5231bec56a00a274be46ea249cb5b3962f0e Parents: ea37b74 Author: Jacob Maes <jm...@linkedin.com> Authored: Thu Feb 23 10:57:32 2017 -0800 Committer: Jacob Maes <jm...@linkedin.com> Committed: Thu Feb 23 10:58:12 2017 -0800 ---------------------------------------------------------------------- .../samza/system/ExecutionEnvironment.java | 9 +- .../org/apache/samza/system/StreamProvider.java | 78 +++++ .../org/apache/samza/system/StreamSpec.java | 22 +- .../system/AbstractExecutionEnvironment.java | 67 ++++ .../system/RemoteExecutionEnvironment.java | 6 +- .../system/StandaloneExecutionEnvironment.java | 6 +- .../org/apache/samza/config/JobConfig.scala | 11 + .../org/apache/samza/config/StreamConfig.scala | 73 ++++- .../TestAbstractExecutionEnvironment.java | 308 +++++++++++++++++++ .../org/apache/samza/config/KafkaConfig.scala | 8 +- 10 files changed, 567 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java index ad37eb3..ef46626 100644 --- a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java +++ b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java @@ -18,6 +18,7 @@ */ package org.apache.samza.system; +import java.lang.reflect.Constructor; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.config.ConfigException; import org.apache.samza.operators.StreamGraphBuilder; @@ -28,7 +29,7 @@ import org.apache.samza.config.Config; * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph} */ @InterfaceStability.Unstable -public interface ExecutionEnvironment { +public interface ExecutionEnvironment extends StreamProvider { String ENVIRONMENT_CONFIG = "job.execution.environment.class"; String DEFAULT_ENVIRONMENT_CLASS = "org.apache.samza.system.StandaloneExecutionEnvironment"; @@ -51,8 +52,10 @@ public interface ExecutionEnvironment { */ static ExecutionEnvironment fromConfig(Config config) { try { - if (ExecutionEnvironment.class.isAssignableFrom(Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)))) { - return (ExecutionEnvironment) Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)).newInstance(); + Class<?> environmentClass = Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)); + if (ExecutionEnvironment.class.isAssignableFrom(environmentClass)) { + Constructor<?> constructor = environmentClass.getConstructor(Config.class); // *sigh* + return (ExecutionEnvironment) constructor.newInstance(config); } } catch (Exception e) { throw new ConfigException(String.format("Problem in loading ExecutionEnvironment class %s", config.get(ENVIRONMENT_CONFIG)), e); http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-api/src/main/java/org/apache/samza/system/StreamProvider.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamProvider.java b/samza-api/src/main/java/org/apache/samza/system/StreamProvider.java new file mode 100644 index 0000000..62c2ec4 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/system/StreamProvider.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system; + +/** + * Describes the common interface for classes that construct instances of {@link StreamSpec}. + */ +public interface StreamProvider { + + /** + * Constructs a {@link StreamSpec} from the configuration for the specified streamId. + * + * The stream configurations are read from the following properties in the config: + * {@code streams.{$streamId}.*} + * <br> + * All properties matching this pattern are assumed to be system-specific with two exceptions. The following two + * properties are Samza properties which are used to bind the stream to a system and a physical resource on that system. + * + * <ul> + * <li>system - The name of the System on which this stream will be used. If this property isn't defined + * the stream will be associated with the System defined in {@code job.default.system}</li> + * <li>physicalName - The system-specific name for this stream. It could be a file URN, topic name, or other identifer.</li> + * </ul> + * + * @param streamId The logical identifier for the stream in Samza. + * @return The {@link StreamSpec} instance. + */ + StreamSpec streamFromConfig(String streamId); + + /** + * Constructs a {@link StreamSpec} from the configuration for the specified streamId. + * + * The stream configurations are read from the following properties in the config: + * {@code streams.{$streamId}.*} + * <br> + * All properties matching this pattern are assumed to be system-specific with one exception. The following + * property is a Samza property which is used to bind the stream to a system. + * + * <ul> + * <li>system - The name of the System on which this stream will be used. If this property isn't defined + * the stream will be associated with the System defined in {@code job.default.system}</li> + * </ul> + * + * @param streamId The logical identifier for the stream in Samza. + * @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer. + * @return The {@link StreamSpec} instance. + */ + StreamSpec streamFromConfig(String streamId, String physicalName); + + /** + * Constructs a {@link StreamSpec} from the configuration for the specified streamId. + * + * The stream configurations are read from the following properties in the config: + * {@code streams.{$streamId}.*} + * + * @param streamId The logical identifier for the stream in Samza. + * @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer. + * @param systemName The name of the System on which this stream will be used. + * @return The {@link StreamSpec} instance. + */ + StreamSpec streamFromConfig(String streamId, String physicalName, String systemName); +} http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java index d8a2144..3bd0076 100644 --- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java +++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java @@ -137,16 +137,11 @@ public class StreamSpec { * @param config A map of properties for the stream. These may be System-specfic. */ public StreamSpec(String id, String physicalName, String systemName, int partitionCount, Map<String, String> config) { - if (id == null) { - throw new NullPointerException("Parameter 'id' must not be null"); - } - - if (systemName == null) { - throw new NullPointerException("Parameter 'systemName' must not be null"); - } + validateLogicalIdentifier("id", id); + validateLogicalIdentifier("systemName", systemName); if (partitionCount < 1) { - throw new NullPointerException("Parameter 'partitionCount' must not be greater than 0"); + throw new IllegalArgumentException("Parameter 'partitionCount' must be greater than 0"); } this.id = id; @@ -200,4 +195,15 @@ public class StreamSpec { public String getOrDefault(String propertyName, String defaultValue) { return config.getOrDefault(propertyName, defaultValue); } + + private void validateLogicalIdentifier(String identifierName, String identifierValue) { + /*if (identifier == null) { + throw new NullPointerException(); + } else if (identifier.isEmpty()) { + + } else*/ + if (!identifierValue.matches("[A-Za-z0-9_-]+")) { + throw new IllegalArgumentException(String.format("Identifier '%s' must match the expression [A-Za-z0-9_-]+", identifierName)); + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java new file mode 100644 index 0000000..c066bdd --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system; + +import java.util.Map; +import org.apache.samza.config.Config; +import org.apache.samza.config.StreamConfig; + + +public abstract class AbstractExecutionEnvironment implements ExecutionEnvironment { + + private final Config config; + + public AbstractExecutionEnvironment(Config config) { + if (config == null) { + throw new NullPointerException(); + } + + this.config = config; + } + + @Override + public StreamSpec streamFromConfig(String streamId) { + StreamConfig streamConfig = new StreamConfig(config); + + String system = streamConfig.getSystem(streamId); + String physicalName = streamConfig.getPhysicalName(streamId, streamId); + Map<String, String> properties = streamConfig.getStreamProperties(streamId); + + return new StreamSpec(streamId, physicalName, system, properties); + } + + @Override + public StreamSpec streamFromConfig(String streamId, String physicalName) { + StreamConfig streamConfig = new StreamConfig(config); + + String system = streamConfig.getSystem(streamId); + Map<String, String> properties = streamConfig.getStreamProperties(streamId); + + return new StreamSpec(streamId, physicalName, system, properties); + } + + @Override + public StreamSpec streamFromConfig(String streamId, String physicalName, String system) { + StreamConfig streamConfig = new StreamConfig(config); + + Map<String, String> properties = streamConfig.getStreamProperties(streamId); + + return new StreamSpec(streamId, physicalName, system, properties); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java index 1dbc5f4..851c7f3 100644 --- a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java +++ b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java @@ -33,7 +33,11 @@ import org.slf4j.LoggerFactory; /** * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment */ -public class RemoteExecutionEnvironment implements ExecutionEnvironment { +public class RemoteExecutionEnvironment extends AbstractExecutionEnvironment { + + public RemoteExecutionEnvironment(Config config) { + super(config); + } private static final Logger log = LoggerFactory.getLogger(RemoteExecutionEnvironment.class); @Override public void run(StreamGraphBuilder app, Config config) { http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java index f0f6ef2..71d60ef 100644 --- a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java +++ b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java @@ -28,7 +28,11 @@ import org.apache.samza.operators.StreamGraphImpl; /** * This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment */ -public class StandaloneExecutionEnvironment implements ExecutionEnvironment { +public class StandaloneExecutionEnvironment extends AbstractExecutionEnvironment { + + public StandaloneExecutionEnvironment(Config config) { + super(config); + } // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment} StreamGraph createGraph(StreamGraphBuilder app, Config config) { http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index a797ac2..6b1473c 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -43,6 +43,8 @@ object JobConfig { val SAMZA_FWK_PATH = "samza.fwk.path" val SAMZA_FWK_VERSION = "samza.fwk.version" val JOB_COORDINATOR_SYSTEM = "job.coordinator.system" + val JOB_METADATA_DEFAULT_SYSTEM = "job.metadata.system" + val JOB_DEFAULT_SYSTEM = "job.default.system" val JOB_CONTAINER_COUNT = "job.container.count" val jOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size" val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode" @@ -108,6 +110,15 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse( throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution.")) + def getMetadataSystemName = { + getOption(JobConfig.JOB_METADATA_DEFAULT_SYSTEM) match { + case Some(system) => Some(system) + case _ => getDefaultSystem + } + } + + def getDefaultSystem = getOption(JobConfig.JOB_DEFAULT_SYSTEM) + def getContainerCount = { getOption(JobConfig.JOB_CONTAINER_COUNT) match { case Some(count) => count.toInt http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala index 0ccc7df..c376681 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala @@ -19,13 +19,23 @@ package org.apache.samza.config +import org.apache.samza.config.JobConfig.Config2Job +import org.apache.samza.system.SystemStream import org.apache.samza.util.Logging + import scala.collection.JavaConversions._ -import org.apache.samza.system.SystemStream object StreamConfig { // stream config constants val STREAM_PREFIX = "systems.%s.streams.%s." + + val SYSTEM = "system" + val PHYSICAL_NAME = "physicalName" + val STREAM_PREFIX_BY_ID = "streams.%s." + val SYSTEM_FOR_STREAM_ID = STREAM_PREFIX_BY_ID + SYSTEM + val PHYSICAL_NAME_FOR_STREAM_ID = STREAM_PREFIX_BY_ID + PHYSICAL_NAME + val SAMZA_STREAM_PROPERTIES = Set(StreamConfig.SYSTEM, StreamConfig.PHYSICAL_NAME) + val MSG_SERDE = STREAM_PREFIX + "samza.msg.serde" val KEY_SERDE = STREAM_PREFIX + "samza.key.serde" val CONSUMER_RESET_OFFSET = STREAM_PREFIX + "samza.reset.offset" @@ -79,4 +89,65 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { new SystemStream(systemName, streamName) }).toSet } + + /** + * Gets the stream properties from the legacy config style: + * systems.{system}.streams.{streams}.* + * + * @param systemName the system name under which the properties are configured + * @param streamName the stream name + * @return the map of properties for the stream + */ + def getSystemStreamProperties(systemName: String, streamName: String) = { + config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true) + } + + /** + * Gets the properties for the specified streamId from the config. + * This method supercedes {@link StreamConfig.#getSystemStreamProperties} + * It first applies any legacy configs from this config location: + * systems.{system}.streams.{stream}.* + * + * It then overrides them with properties of the new config format: + * streams.{streamId}.* + * + * @param streamId the identifier for the stream in the config. + * @return the merged map of config properties from both the legacy and new config styles + */ + def getStreamProperties(streamId: String) = { + val properties = subset(StreamConfig.STREAM_PREFIX_BY_ID format streamId) + val inheritedLegacyProperties:java.util.Map[String, String] = getSystemStreamProperties(getSystem(streamId), streamId) + val filteredStreamProperties:java.util.Map[String, String] = properties.filterKeys(k => !StreamConfig.SAMZA_STREAM_PROPERTIES.contains(k)) + new MapConfig(java.util.Arrays.asList(inheritedLegacyProperties, filteredStreamProperties)) + } + + /** + * Gets the System associated with the specified streamId. + * It first looks for the property + * streams.{streamId}.system + * + * If no value was provided, it uses + * job.default.system + * + * @param streamId the identifier for the stream in the config. + * @return the system name associated with the stream. + */ + def getSystem(streamId: String) = { + getOption(StreamConfig.SYSTEM_FOR_STREAM_ID format streamId) match { + case Some(system) => system + case _ => config.getDefaultSystem.getOrElse(throw new ConfigException("Missing %s configuration. Cannot bind stream to a system without it." + format(StreamConfig.SYSTEM_FOR_STREAM_ID format(streamId)))) + } + } + + /** + * Gets the physical name for the specified streamId. + * + * @param streamId the identifier for the stream in the config. + * @param defaultPhysicalName the default to use if the physical name is missing. + * @return the physical identifier for the stream or the default if it is undefined. + */ + def getPhysicalName(streamId: String, defaultPhysicalName: String) = { + get(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID format streamId, defaultPhysicalName) + } } http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java b/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java new file mode 100644 index 0000000..e547322 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system; + +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.config.Config; +import org.apache.samza.config.ConfigException; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.StreamConfig; +import org.apache.samza.operators.StreamGraphBuilder; +import org.junit.Test; + +import static org.junit.Assert.*; +public class TestAbstractExecutionEnvironment { + private static final String STREAM_ID = "t3st-Stream_Id"; + private static final String STREAM_ID_INVALID = "test#Str3amId!"; + + private static final String TEST_PHYSICAL_NAME = "t3st-Physical_Name"; + private static final String TEST_PHYSICAL_NAME2 = "testPhysicalName2"; + private static final String TEST_PHYSICAL_NAME_SPECIAL_CHARS = "test://Physical.Name?"; + + private static final String TEST_SYSTEM = "t3st-System_Name"; + private static final String TEST_SYSTEM2 = "testSystemName2"; + private static final String TEST_SYSTEM_INVALID = "test:System!Name@"; + + private static final String TEST_DEFAULT_SYSTEM = "testDefaultSystemName"; + + + @Test(expected = NullPointerException.class) + public void testConfigValidation() { + new TestAbstractExecutionEnvironmentImpl(null); + } + + // The physical name should be pulled from the StreamConfig.PHYSICAL_NAME property value. + @Test + public void testStreamFromConfigWithPhysicalNameInConfig() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + ExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID); + + assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName()); + } + + // The streamId should be used as the physicalName when the physical name is not specified. + // NOTE: its either this, set to null, or exception. This seems better for backward compatibility and API brevity. + @Test + public void testStreamFromConfigWithoutPhysicalNameInConfig() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + ExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID); + + assertEquals(STREAM_ID, spec.getPhysicalName()); + } + + // If the system is specified at the stream scope, use it + @Test + public void testStreamFromConfigWithSystemAtStreamScopeInConfig() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + ExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID); + + assertEquals(TEST_SYSTEM, spec.getSystemName()); + } + + // If system isn't specified at stream scope, use the default system + @Test + public void testStreamFromConfigWithSystemAtDefaultScopeInConfig() { + Config config = addConfigs(buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME), + JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM); + + ExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID); + + assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName()); + } + + // Stream scope should override default scope + @Test + public void testStreamFromConfigWithSystemAtBothScopesInConfig() { + Config config = addConfigs(buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM), + JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM); + + ExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID); + + assertEquals(TEST_SYSTEM, spec.getSystemName()); + } + + // System is required. Throw if it cannot be determined. + @Test(expected = ConfigException.class) + public void testStreamFromConfigWithOutSystemInConfig() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME); + + ExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID); + + assertEquals(TEST_SYSTEM, spec.getSystemName()); + + } + + // The properties in the config "streams.{streamId}.*" should be passed through to the spec. + @Test + public void testStreamFromConfigPropertiesPassthrough() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM, + "systemProperty1", "systemValue1", + "systemProperty2", "systemValue2"); + + ExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID); + + Map<String, String> properties = spec.getConfig(); + assertEquals(2, properties.size()); + assertEquals("systemValue1", properties.get("systemProperty1")); + assertEquals("systemValue2", properties.get("systemProperty2")); + assertEquals("systemValue1", spec.get("systemProperty1")); + assertEquals("systemValue2", spec.get("systemProperty2")); + } + + // The samza properties (which are invalid for the underlying system) should be filtered out. + @Test + public void testStreamFromConfigSamzaPropertiesOmitted() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME, + StreamConfig.SYSTEM(), TEST_SYSTEM, + "systemProperty1", "systemValue1", + "systemProperty2", "systemValue2"); + + ExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID); + + Map<String, String> properties = spec.getConfig(); + assertEquals(2, properties.size()); + assertNull(properties.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID))); + assertNull(properties.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID))); + assertNull(spec.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID))); + assertNull(spec.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID))); + } + + // When the physicalName argument is passed explicitly it should be used, regardless of whether it is also in the config + @Test + public void testStreamFromConfigPhysicalNameArgSimple() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg + StreamConfig.SYSTEM(), TEST_SYSTEM); + + ExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME); + + assertEquals(STREAM_ID, spec.getId()); + assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName()); + assertEquals(TEST_SYSTEM, spec.getSystemName()); + } + + // Special characters are allowed for the physical name + @Test + public void testStreamFromConfigPhysicalNameArgSpecialCharacters() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + ExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME_SPECIAL_CHARS); + assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName()); + } + + // Null is allowed for the physical name + @Test + public void testStreamFromConfigPhysicalNameArgNull() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + ExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID, null); + assertNull(spec.getPhysicalName()); + } + + // When the system name is provided explicitly, it should be used, regardless of whether it's also in the config + @Test + public void testStreamFromConfigSystemNameArgValid() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg + StreamConfig.SYSTEM(), TEST_SYSTEM2); // This too + + ExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); + StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM); + + assertEquals(STREAM_ID, spec.getId()); + assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName()); + assertEquals(TEST_SYSTEM, spec.getSystemName()); + } + + // Special characters are NOT allowed for system name, because it's used as an identifier in the config. + @Test(expected = IllegalArgumentException.class) + public void testStreamFromConfigSystemNameArgInvalid() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, + StreamConfig.SYSTEM(), TEST_SYSTEM2); + + ExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); + env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID); + } + + // Null is not allowed for system name. + @Test(expected = NullPointerException.class) + public void testStreamFromConfigSystemNameArgNull() { + Config config = buildStreamConfig(STREAM_ID, + StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, + StreamConfig.SYSTEM(), TEST_SYSTEM2); + + ExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); + env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, null); + } + + // Special characters are NOT allowed for streamId, because it's used as an identifier in the config. + @Test(expected = IllegalArgumentException.class) + public void testStreamFromConfigStreamIdInvalid() { + Config config = buildStreamConfig(STREAM_ID_INVALID, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + ExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); + env.streamFromConfig(STREAM_ID_INVALID); + } + + // Null is not allowed for streamId. + @Test(expected = NullPointerException.class) + public void testStreamFromConfigStreamIdNull() { + Config config = buildStreamConfig(null, + StreamConfig.SYSTEM(), TEST_SYSTEM); + + ExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config); + env.streamFromConfig(null); + } + + + // Helper methods + + private Config buildStreamConfig(String streamId, String... kvs) { + // inject streams.x. into each key + for (int i = 0; i < kvs.length - 1; i += 2) { + kvs[i] = String.format(StreamConfig.STREAM_PREFIX_BY_ID(), streamId) + kvs[i]; + } + return buildConfig(kvs); + } + + private Config buildConfig(String... kvs) { + if (kvs.length % 2 != 0) { + throw new IllegalArgumentException("There must be parity between the keys and values"); + } + + Map<String, String> configMap = new HashMap<>(); + for (int i = 0; i < kvs.length - 1; i += 2) { + configMap.put(kvs[i], kvs[i + 1]); + } + return new MapConfig(configMap); + } + + private Config addConfigs(Config original, String... kvs) { + Map<String, String> result = new HashMap<>(); + result.putAll(original); + result.putAll(buildConfig(kvs)); + return new MapConfig(result); + } + + private class TestAbstractExecutionEnvironmentImpl extends AbstractExecutionEnvironment { + + public TestAbstractExecutionEnvironmentImpl(Config config) { + super(config); + } + + @Override + public void run(StreamGraphBuilder graphBuilder, Config config) { + // do nothing + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/674e5231/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index e355e7e..7e9f18a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -36,6 +36,7 @@ import java.util import scala.collection.JavaConverters._ import org.apache.samza.system.kafka.KafkaSystemFactory import org.apache.samza.config.SystemConfig.Config2System +import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.kafka.common.serialization.ByteArraySerializer object KafkaConfig { @@ -165,13 +166,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { kafkaChangeLogProperties } - def getTopicKafkaProperties(systemName: String, streamName: String) = { - val filteredConfigs = config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true) - val topicProperties = new Properties - filteredConfigs.foreach { kv => topicProperties.setProperty(kv._1, kv._2) } - topicProperties - } - // kafka config def getKafkaSystemConsumerConfig( systemName: String, clientId: String,