Repository: samza Updated Branches: refs/heads/master e85b01dcb -> 0f06da1f8
SAMZA-1416: Better logging around the exception where class loading failed in initializing the SystemFactory for a input/output system Also added test coverage for the Util.getObj method. nickpan47 jmakes Author: Daniel Nishimura <dnishim...@gmail.com> Reviewers: Jacob Maes <jm...@linkedin.com> Closes #296 from dnishimura/samza-1416 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0f06da1f Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0f06da1f Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0f06da1f Branch: refs/heads/master Commit: 0f06da1f80566f3090f9826b46fe46a3fadcd3f2 Parents: e85b01d Author: Daniel Nishimura <dnishim...@gmail.com> Authored: Tue Sep 19 14:26:38 2017 -0700 Committer: Jacob Maes <jm...@linkedin.com> Committed: Tue Sep 19 14:26:38 2017 -0700 ---------------------------------------------------------------------- .../apache/samza/config/JavaSystemConfig.java | 14 ++-- .../org/apache/samza/config/SystemConfig.scala | 16 ++--- .../main/scala/org/apache/samza/util/Util.scala | 15 +++-- .../samza/config/TestJavaSystemConfig.java | 35 ++++++++-- .../apache/samza/config/TestSystemConfig.scala | 67 ++++++++++++++++++++ .../scala/org/apache/samza/util/TestUtil.scala | 13 ++++ 6 files changed, 137 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java index 350f20c..57707aa 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemFactory; @@ -33,10 +34,10 @@ import org.apache.samza.util.Util; * a java version of the system config */ public class JavaSystemConfig extends MapConfig { - private static final String SYSTEM_PREFIX = "systems."; - private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory"; - private static final String SYSTEM_FACTORY_FORMAT = SYSTEM_PREFIX + "%s" + SYSTEM_FACTORY_SUFFIX; - private static final String SYSTEM_DEFAULT_STREAMS_PREFIX = SYSTEM_PREFIX + "%s" + ".default.stream."; + public static final String SYSTEM_PREFIX = "systems."; + public static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory"; + public static final String SYSTEM_FACTORY_FORMAT = SYSTEM_PREFIX + "%s" + SYSTEM_FACTORY_SUFFIX; + private static final String SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT = SYSTEM_PREFIX + "%s" + ".default.stream."; private static final String EMPTY = ""; public JavaSystemConfig(Config config) { @@ -48,7 +49,8 @@ public class JavaSystemConfig extends MapConfig { return null; } String systemFactory = String.format(SYSTEM_FACTORY_FORMAT, name); - return get(systemFactory, null); + String value = get(systemFactory, null); + return (StringUtils.isBlank(value)) ? null : value; } /** @@ -108,6 +110,6 @@ public class JavaSystemConfig extends MapConfig { * @return a subset of the config with the system prefix removed. */ public Config getDefaultStreamProperties(String systemName) { - return subset(String.format(SYSTEM_DEFAULT_STREAMS_PREFIX, systemName), true); + return subset(String.format(SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, systemName), true); } } http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala index 804955c..91fb261 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala @@ -27,15 +27,17 @@ import org.apache.samza.util.Logging */ object SystemConfig { // system config constants - val SYSTEM_PREFIX = "systems.%s." - val SYSTEM_FACTORY = "systems.%s.samza.factory" + val SYSTEM_PREFIX = JavaSystemConfig.SYSTEM_PREFIX + "%s." + val SYSTEM_FACTORY = JavaSystemConfig.SYSTEM_FACTORY_FORMAT val CONSUMER_OFFSET_DEFAULT = SYSTEM_PREFIX + "samza.offset.default" implicit def Config2System(config: Config) = new SystemConfig(config) } class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging { - def getSystemFactory(name: String) = getOption(SystemConfig.SYSTEM_FACTORY format name) + val javaSystemConfig = new JavaSystemConfig(config) + + def getSystemFactory(name: String) = Option(javaSystemConfig.getSystemFactory(name)) def getSystemKeySerde(name: String) = getSystemDefaultStreamProperty(name, StreamConfig.KEY_SERDE) @@ -47,14 +49,10 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging { * Returns a list of all system names from the config file. Useful for * getting individual systems. */ - def getSystemNames() = { - val subConf = config.subset("systems.", true) - // find all .samza.factory keys, and strip the suffix - subConf.asScala.keys.filter(k => k.endsWith(".samza.factory")).map(_.replace(".samza.factory", "")) - } + def getSystemNames() = javaSystemConfig.getSystemNames().asScala private def getSystemDefaultStreamProperty(name: String, property: String) = { - val defaultStreamProperties = new JavaSystemConfig(config).getDefaultStreamProperties(name) + val defaultStreamProperties = javaSystemConfig.getDefaultStreamProperties(name) val streamDefault = defaultStreamProperties.get(property) if (!(streamDefault == null || streamDefault.isEmpty)) { Option(streamDefault) http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 6c224e6..d639620 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -81,10 +81,17 @@ object Util extends Logging { * Instantiate a class instance from a given className. */ def getObj[T](className: String) = { - Class - .forName(className) - .newInstance - .asInstanceOf[T] + try { + Class + .forName(className) + .newInstance + .asInstanceOf[T] + } catch { + case e: Throwable => { + error("Unable to instantiate a class instance for %s." format className, e) + throw e + } + } } /** http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java index 9b39ec8..94ba374 100644 --- a/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java +++ b/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java @@ -27,15 +27,42 @@ import java.util.Map; import org.junit.Test; public class TestJavaSystemConfig { + private static final String MOCK_SYSTEM_NAME1 = "mocksystem1"; + private static final String MOCK_SYSTEM_NAME2 = "mocksystem2"; + private static final String MOCK_SYSTEM_FACTORY_NAME1 = String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME1); + private static final String MOCK_SYSTEM_FACTORY_NAME2 = String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME2); + private static final String MOCK_SYSTEM_FACTORY_CLASSNAME1 = "some.factory.Class1"; + private static final String MOCK_SYSTEM_FACTORY_CLASSNAME2 = "some.factory.Class2"; + + @Test + public void testClassName() { + Map<String, String> map = new HashMap<String, String>(); + map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1); + JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map)); + + assertEquals(MOCK_SYSTEM_FACTORY_CLASSNAME1, systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1)); + } + + @Test + public void testGetEmptyClassNameAsNull() { + Map<String, String> map = new HashMap<String, String>(); + map.put(MOCK_SYSTEM_FACTORY_NAME1, ""); + map.put(MOCK_SYSTEM_FACTORY_NAME2, " "); + JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map)); + + assertNull(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1)); + assertNull(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME2)); + } @Test public void testGetSystemNames() { Map<String, String> map = new HashMap<String, String>(); - map.put("systems.system1.samza.factory", "1"); - map.put("systems.system2.samza.factory", "2"); - JavaSystemConfig systemConfig = new JavaSystemConfig( - new MapConfig(map)); + map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1); + map.put(MOCK_SYSTEM_FACTORY_NAME2, MOCK_SYSTEM_FACTORY_CLASSNAME2); + JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map)); assertEquals(2, systemConfig.getSystemNames().size()); + assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME1)); + assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME2)); } } http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala b/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala new file mode 100644 index 0000000..cc54d00 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config + +import scala.collection.JavaConverters._ +import org.apache.samza.config.SystemConfig.{Config2System, SYSTEM_FACTORY} +import org.junit.Assert._ +import org.junit.Test + +class TestSystemConfig { + val MOCK_SYSTEM_NAME1 = "mocksystem1" + val MOCK_SYSTEM_NAME2 = "mocksystem2" + val MOCK_SYSTEM_FACTORY_NAME1 = SYSTEM_FACTORY.format(MOCK_SYSTEM_NAME1) + val MOCK_SYSTEM_FACTORY_NAME2 = SYSTEM_FACTORY.format(MOCK_SYSTEM_NAME2) + val MOCK_SYSTEM_FACTORY_CLASSNAME1 = "some.factory.Class1" + val MOCK_SYSTEM_FACTORY_CLASSNAME2 = "some.factory.Class2" + + def testClassName { + val configMap = Map[String, String]( + MOCK_SYSTEM_FACTORY_NAME1 -> MOCK_SYSTEM_FACTORY_CLASSNAME1 + ) + val config = new MapConfig(configMap.asJava) + + assertEquals(MOCK_SYSTEM_FACTORY_CLASSNAME1, config.getSystemFactory(MOCK_SYSTEM_NAME1).getOrElse("")) + } + + @Test + def testGetEmptyClassNameAsNull { + val configMap = Map[String, String]( + MOCK_SYSTEM_FACTORY_NAME1 -> "", + MOCK_SYSTEM_FACTORY_NAME1 -> " " + ) + val config = new MapConfig(configMap.asJava) + + assertEquals(config.getSystemFactory(MOCK_SYSTEM_NAME1), None) + assertEquals(config.getSystemFactory(MOCK_SYSTEM_NAME2), None) + } + + def testGetSystemNames { + val configMap = Map[String, String]( + MOCK_SYSTEM_FACTORY_NAME1 -> MOCK_SYSTEM_FACTORY_CLASSNAME1, + MOCK_SYSTEM_FACTORY_NAME2 -> MOCK_SYSTEM_FACTORY_CLASSNAME2 + ) + val config = new MapConfig(configMap.asJava) + val systemNames = config.getSystemNames() + + assertTrue(systemNames.contains(MOCK_SYSTEM_NAME1)) + assertTrue(systemNames.contains(MOCK_SYSTEM_NAME2)) + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/0f06da1f/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala index da7c71d..1f7dc01 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala @@ -110,4 +110,17 @@ class TestUtil { assertEquals(Long.MinValue, Util.clampAdd(Long.MinValue, Long.MinValue)) assertEquals(-1, Util.clampAdd(Long.MaxValue, Long.MinValue)) } + + @Test + def testGetObjExistingClass() { + val obj = Util.getObj[MapConfig]("org.apache.samza.config.MapConfig") + assertNotNull(obj) + assertEquals(classOf[MapConfig], obj.getClass()) + } + + @Test(expected = classOf[ClassNotFoundException]) + def testGetObjNonexistentClass() { + Util.getObj("this.class.does.NotExist") + assert(false, "This should not get hit.") + } }