This is an automated email from the ASF dual-hosted git repository. shanthoosh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new f319db5 SAMZA-2131: [Scala cleanup] Convert FileSystemCheckpointManagerConfig.scala and SystemConfig.scala to Java (#955) f319db5 is described below commit f319db519168cf06de14232de99ef0cf38fdb9f7 Author: cameronlee314 <37879374+cameronlee...@users.noreply.github.com> AuthorDate: Tue Mar 26 11:23:29 2019 -0700 SAMZA-2131: [Scala cleanup] Convert FileSystemCheckpointManagerConfig.scala and SystemConfig.scala to Java (#955) * moving FileSystemCheckpointManagerConfig.scala and SystemConfig.scala to java * renaming JavaSystemConfig to SystemConfig now that the scala version is gone --- .../config/FileSystemCheckpointManagerConfig.java} | 22 +- .../{JavaSystemConfig.java => SystemConfig.java} | 97 ++++++-- .../samza/storage/ChangelogStreamManager.java | 4 +- .../org/apache/samza/storage/StorageRecovery.java | 4 +- .../java/org/apache/samza/system/SystemAdmins.java | 4 +- .../apache/samza/checkpoint/OffsetManager.scala | 4 +- .../file/FileSystemCheckpointManager.scala | 9 +- .../org/apache/samza/config/StreamConfig.scala | 2 +- .../org/apache/samza/config/SystemConfig.scala | 68 ----- .../apache/samza/container/SamzaContainer.scala | 17 +- .../apache/samza/coordinator/JobModelManager.scala | 8 - .../reporter/MetricsSnapshotReporterFactory.scala | 10 +- .../apache/samza/util/CoordinatorStreamUtil.scala | 12 +- .../TestFileSystemCheckpointManagerConfig.java | 43 ++++ .../apache/samza/config/TestJavaSystemConfig.java | 68 ----- .../org/apache/samza/config/TestSystemConfig.java | 273 +++++++++++++++++++++ .../samza/checkpoint/TestCheckpointTool.scala | 8 +- .../org/apache/samza/config/TestSystemConfig.scala | 67 ----- .../samza/coordinator/TestJobCoordinator.scala | 16 +- .../TestRangeSystemStreamPartitionMatcher.scala | 4 +- .../TestRegexSystemStreamPartitionMatcher.scala | 4 +- .../apache/samza/config/KafkaConsumerConfig.java | 6 +- .../samza/system/kafka/KafkaSystemAdmin.java | 2 +- .../kafka/KafkaCheckpointManagerFactory.scala | 8 +- .../org/apache/samza/config/KafkaConfig.scala | 17 +- .../samza/config_deprecated/KafkaConfig.scala | 14 +- .../kafka_deprecated/KafkaSystemFactory.scala | 4 +- .../kafka/TestKafkaCheckpointManager.scala | 9 +- .../org/apache/samza/config/Log4jSystemConfig.java | 2 +- .../apache/samza/logging/log4j/StreamAppender.java | 11 +- .../org/apache/samza/config/Log4jSystemConfig.java | 2 +- .../samza/logging/log4j2/StreamAppender.java | 11 +- .../descriptors/InMemorySystemDescriptor.java | 4 +- .../benchmark/SystemConsumerWithSamzaBench.java | 2 +- 34 files changed, 493 insertions(+), 343 deletions(-) diff --git a/samza-core/src/main/scala/org/apache/samza/config/FileSystemCheckpointManagerConfig.scala b/samza-core/src/main/java/org/apache/samza/config/FileSystemCheckpointManagerConfig.java similarity index 60% rename from samza-core/src/main/scala/org/apache/samza/config/FileSystemCheckpointManagerConfig.scala rename to samza-core/src/main/java/org/apache/samza/config/FileSystemCheckpointManagerConfig.java index 707ea59..e131895 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/FileSystemCheckpointManagerConfig.scala +++ b/samza-core/src/main/java/org/apache/samza/config/FileSystemCheckpointManagerConfig.java @@ -16,16 +16,22 @@ * specific language governing permissions and limitations * under the License. */ +package org.apache.samza.config; -package org.apache.samza.config +import java.util.Optional; -object FileSystemCheckpointManagerConfig { - // file system checkpoint manager config constants - val CHECKPOINT_MANAGER_ROOT = "task.checkpoint.path" // system name to use when sending offset checkpoints - implicit def Config2FSCP(config: Config) = new FileSystemCheckpointManagerConfig(config) -} +public class FileSystemCheckpointManagerConfig extends MapConfig { + /** + * Path on local file system where checkpoints should be stored. + */ + private static final String CHECKPOINT_MANAGER_ROOT = "task.checkpoint.path"; + + public FileSystemCheckpointManagerConfig(Config config) { + super(config); + } -class FileSystemCheckpointManagerConfig(config: Config) extends ScalaMapConfig(config) { - def getFileSystemCheckpointRoot = getOption(FileSystemCheckpointManagerConfig.CHECKPOINT_MANAGER_ROOT) + public Optional<String> getFileSystemCheckpointRoot() { + return Optional.ofNullable(get(CHECKPOINT_MANAGER_ROOT)); + } } diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java similarity index 55% rename from samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java rename to samza-core/src/main/java/org/apache/samza/config/SystemConfig.java index fde98c6..93a0c32 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/SystemConfig.java @@ -22,38 +22,49 @@ package org.apache.samza.config; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemFactory; import org.apache.samza.util.Util; - /** - * a java version of the system config + * Config helper methods related to systems. */ -public class JavaSystemConfig extends MapConfig { - public static final String SYSTEM_PREFIX = "systems."; - public static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory"; - public static final String SYSTEM_FACTORY_FORMAT = SYSTEM_PREFIX + "%s" + SYSTEM_FACTORY_SUFFIX; - private static final String SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT = SYSTEM_PREFIX + "%s" + ".default.stream."; +public class SystemConfig extends MapConfig { + private static final String SYSTEMS_PREFIX = "systems."; + public static final String SYSTEM_ID_PREFIX = SYSTEMS_PREFIX + "%s."; + + private static final String SYSTEM_FACTORY_SUFFIX = ".samza.factory"; + public static final String SYSTEM_FACTORY_FORMAT = SYSTEMS_PREFIX + "%s" + SYSTEM_FACTORY_SUFFIX; + @VisibleForTesting + static final String SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT = SYSTEM_ID_PREFIX + "default.stream."; + + // If true, automatically delete committed messages from streams whose committed messages can be deleted. + // A stream's committed messages can be deleted if it is a intermediate stream, or if user has manually + // set streams.{streamId}.samza.delete.committed.messages to true in the configuration. + @VisibleForTesting + static final String DELETE_COMMITTED_MESSAGES = SYSTEM_ID_PREFIX + "samza.delete.committed.messages"; + private static final String EMPTY = ""; - public static final String SAMZA_SYSTEM_OFFSET_UPCOMING = "upcoming"; - public static final String SAMZA_SYSTEM_OFFSET_OLDEST = "oldest"; + static final String SAMZA_SYSTEM_OFFSET_UPCOMING = "upcoming"; + static final String SAMZA_SYSTEM_OFFSET_OLDEST = "oldest"; - public JavaSystemConfig(Config config) { + public SystemConfig(Config config) { super(config); } - public String getSystemFactory(String name) { - if (name == null) { - return null; + public Optional<String> getSystemFactory(String systemName) { + if (systemName == null) { + return Optional.empty(); } - String systemFactory = String.format(SYSTEM_FACTORY_FORMAT, name); + String systemFactory = String.format(SYSTEM_FACTORY_FORMAT, systemName); String value = get(systemFactory, null); - return (StringUtils.isBlank(value)) ? null : value; + return (StringUtils.isBlank(value)) ? Optional.empty() : Optional.of(value); } /** @@ -62,8 +73,8 @@ public class JavaSystemConfig extends MapConfig { * @return A list system names */ public List<String> getSystemNames() { - Config subConf = subset(SYSTEM_PREFIX, true); - ArrayList<String> systemNames = new ArrayList<String>(); + Config subConf = subset(SYSTEMS_PREFIX, true); + ArrayList<String> systemNames = new ArrayList<>(); for (Map.Entry<String, String> entry : subConf.entrySet()) { String key = entry.getKey(); if (key.endsWith(SYSTEM_FACTORY_SUFFIX)) { @@ -81,7 +92,7 @@ public class JavaSystemConfig extends MapConfig { public Map<String, SystemAdmin> getSystemAdmins() { return getSystemFactories().entrySet() .stream() - .collect(Collectors.toMap(systemNameToFactoryEntry -> systemNameToFactoryEntry.getKey(), + .collect(Collectors.toMap(Entry::getKey, systemNameToFactoryEntry -> systemNameToFactoryEntry.getValue() .getAdmin(systemNameToFactoryEntry.getKey(), this))); } @@ -105,11 +116,8 @@ public class JavaSystemConfig extends MapConfig { Map<String, SystemFactory> systemFactories = getSystemNames().stream().collect(Collectors.toMap( systemName -> systemName, systemName -> { - String systemFactoryClassName = getSystemFactory(systemName); - if (systemFactoryClassName == null) { - throw new SamzaException( - String.format("A stream uses system %s, which is missing from the configuration.", systemName)); - } + String systemFactoryClassName = getSystemFactory(systemName).orElseThrow(() -> new SamzaException( + String.format("A stream uses system %s, which is missing from the configuration.", systemName))); return Util.getObj(systemFactoryClassName, SystemFactory.class); })); @@ -147,4 +155,47 @@ public class JavaSystemConfig extends MapConfig { return systemOffsetDefault; } + + /** + * @param systemName name of the system + * @return the key serde for the {@code systemName}, or empty if it was not found + */ + public Optional<String> getSystemKeySerde(String systemName) { + return getSystemDefaultStreamProperty(systemName, StreamConfig.KEY_SERDE()); + } + + /** + * @param systemName name of the system + * @return the message serde for the {@code systemName}, or empty if it was not found + */ + public Optional<String> getSystemMsgSerde(String systemName) { + return getSystemDefaultStreamProperty(systemName, StreamConfig.MSG_SERDE()); + } + + /** + * @param systemName name of the system + * @return if messages committed to this system should automatically be deleted + */ + public boolean deleteCommittedMessages(String systemName) { + return getBoolean(String.format(DELETE_COMMITTED_MESSAGES, systemName), false); + } + + /** + * Gets the system-wide default for the {@code propertyName} for the {@code systemName}. + * This will check in a couple of different config locations for the value. + */ + private Optional<String> getSystemDefaultStreamProperty(String systemName, String propertyName) { + Map<String, String> defaultStreamProperties = getDefaultStreamProperties(systemName); + String defaultStreamProperty = defaultStreamProperties.get(propertyName); + if (StringUtils.isNotEmpty(defaultStreamProperty)) { + return Optional.of(defaultStreamProperty); + } else { + String fallbackStreamProperty = get(String.format(SYSTEM_ID_PREFIX, systemName) + propertyName); + if (StringUtils.isNotEmpty(fallbackStreamProperty)) { + return Optional.of(fallbackStreamProperty); + } else { + return Optional.empty(); + } + } + } } diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java index ea55fe5..71635aa 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java @@ -26,7 +26,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JavaStorageConfig; -import org.apache.samza.config.JavaSystemConfig; +import org.apache.samza.config.SystemConfig; import org.apache.samza.container.TaskName; import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage; import org.apache.samza.coordinator.stream.messages.SetChangelogMapping; @@ -116,7 +116,7 @@ public class ChangelogStreamManager { name -> StreamUtil.getSystemStreamFromNames(storageConfig.getChangelogStream(name)))); // Get SystemAdmin for changelog store's system and attempt to create the stream - JavaSystemConfig systemConfig = new JavaSystemConfig(config); + SystemConfig systemConfig = new SystemConfig(config); storeNameSystemStreamMapping.forEach((storeName, systemStream) -> { // Load system admin for this system. SystemAdmin systemAdmin = systemConfig.getSystemAdmin(systemStream.getSystem()); diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java index 4d01159..79491d3 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java @@ -26,7 +26,7 @@ import java.util.Map; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.JavaStorageConfig; -import org.apache.samza.config.JavaSystemConfig; +import org.apache.samza.config.SystemConfig; import org.apache.samza.config.SerializerConfig; import org.apache.samza.container.SamzaContainerMetrics; import org.apache.samza.context.ContainerContext; @@ -207,7 +207,7 @@ public class StorageRecovery extends CommandLine { StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, clock); // don't worry about prefetching for this; looks like the tool doesn't flush to offset files anyways - Map<String, SystemFactory> systemFactories = new JavaSystemConfig(jobConfig).getSystemFactories(); + Map<String, SystemFactory> systemFactories = new SystemConfig(jobConfig).getSystemFactories(); for (ContainerModel containerModel : containers.values()) { ContainerContext containerContext = new ContainerContextImpl(containerModel, new MetricsRegistryMap()); diff --git a/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java b/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java index 242ac67..be15869 100644 --- a/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java +++ b/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java @@ -23,7 +23,7 @@ import java.util.Map; import java.util.Set; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; -import org.apache.samza.config.JavaSystemConfig; +import org.apache.samza.config.SystemConfig; import org.apache.samza.config.MapConfig; @@ -34,7 +34,7 @@ public class SystemAdmins { private final Map<String, SystemAdmin> systemAdminMap; public SystemAdmins(Config config) { - JavaSystemConfig systemConfig = new JavaSystemConfig(config); + SystemConfig systemConfig = new SystemConfig(config); this.systemAdminMap = systemConfig.getSystemAdmins(); } diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala index f5cc6fd..09f778a 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala @@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap import org.apache.samza.SamzaException import org.apache.samza.annotation.InterfaceStability import org.apache.samza.config.StreamConfig.Config2Stream -import org.apache.samza.config.{Config, JavaSystemConfig} +import org.apache.samza.config.{Config, SystemConfig} import org.apache.samza.container.TaskName import org.apache.samza.startpoint.{Startpoint, StartpointManager} import org.apache.samza.system.SystemStreamMetadata.OffsetType @@ -84,7 +84,7 @@ object OffsetManager extends Logging { case (systemStream, systemStreamMetadata) => // Get default offset. val streamDefaultOffset = config.getDefaultStreamOffset(systemStream) - val systemDefaultOffset = new JavaSystemConfig(config).getSystemOffsetDefault(systemStream.getSystem) + val systemDefaultOffset = new SystemConfig(config).getSystemOffsetDefault(systemStream.getSystem) val defaultOffsetType = if (streamDefaultOffset.isDefined) { OffsetType.valueOf(streamDefaultOffset.get.toUpperCase) } else if (systemDefaultOffset != null) { diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala index edd0ace..68afda1 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala @@ -22,17 +22,16 @@ package org.apache.samza.checkpoint.file import java.io.File import java.io.FileNotFoundException import java.io.FileOutputStream -import java.util import org.apache.samza.SamzaException import org.apache.samza.checkpoint.Checkpoint import org.apache.samza.checkpoint.CheckpointManager import org.apache.samza.checkpoint.CheckpointManagerFactory -import org.apache.samza.config.Config -import org.apache.samza.config.FileSystemCheckpointManagerConfig.Config2FSCP +import org.apache.samza.config.{Config, FileSystemCheckpointManagerConfig} import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.container.TaskName import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.serializers.CheckpointSerde +import org.apache.samza.util.ScalaJavaUtil.JavaOptionals import scala.io.Source class FileSystemCheckpointManager( @@ -79,8 +78,8 @@ class FileSystemCheckpointManagerFactory extends CheckpointManagerFactory { val name = config .getName .getOrElse(throw new SamzaException("Missing job name in configs")) - val root = config - .getFileSystemCheckpointRoot + val fileSystemCheckpointManagerConfig = new FileSystemCheckpointManagerConfig(config) + val root = JavaOptionals.toRichOptional(fileSystemCheckpointManagerConfig.getFileSystemCheckpointRoot).toOption .getOrElse(throw new SamzaException("Missing checkpoint root in configs")) new FileSystemCheckpointManager(name, new File(root)) } diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala index 252e38f..5890c07 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala @@ -273,7 +273,7 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { if (systemName == null) { Map() } - val systemConfig = new JavaSystemConfig(config) + val systemConfig = new SystemConfig(config) val defaults = systemConfig.getDefaultStreamProperties(systemName) val explicitConfigs = config.subset(StreamConfig.STREAM_PREFIX format(systemName, streamName), true) new MapConfig(defaults, explicitConfigs) diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala deleted file mode 100644 index fd508c2..0000000 --- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.config - -import scala.collection.JavaConverters._ -import org.apache.samza.util.Logging - -/** - * Note: All new methods are being added to [[org.apache.samza.config.JavaSystemConfig]] - */ -object SystemConfig { - // system config constants - val SYSTEM_PREFIX = JavaSystemConfig.SYSTEM_PREFIX + "%s." - val SYSTEM_FACTORY = JavaSystemConfig.SYSTEM_FACTORY_FORMAT - val CONSUMER_OFFSET_DEFAULT = SYSTEM_PREFIX + "samza.offset.default" - - // If true, automatically delete committed messages from streams whose committed messages can be deleted. - // A stream's committed messages can be deleted if it is a intermediate stream, or if user has manually - // set streams.{streamId}.samza.delete.committed.messages to true in the configuration. - val DELETE_COMMITTED_MESSAGES = SYSTEM_PREFIX + "samza.delete.committed.messages" - - implicit def Config2System(config: Config) = new SystemConfig(config) -} - -class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging { - val javaSystemConfig = new JavaSystemConfig(config) - - def getSystemFactory(name: String) = Option(javaSystemConfig.getSystemFactory(name)) - - def getSystemKeySerde(name: String) = getSystemDefaultStreamProperty(name, StreamConfig.KEY_SERDE) - - def getSystemMsgSerde(name: String) = getSystemDefaultStreamProperty(name, StreamConfig.MSG_SERDE) - - def deleteCommittedMessages(systemName: String) = getBoolean(SystemConfig.DELETE_COMMITTED_MESSAGES format (systemName), false) - - /** - * Returns a list of all system names from the config file. Useful for - * getting individual systems. - */ - def getSystemNames() = javaSystemConfig.getSystemNames().asScala - - private def getSystemDefaultStreamProperty(name: String, property: String) = { - val defaultStreamProperties = javaSystemConfig.getDefaultStreamProperties(name) - val streamDefault = defaultStreamProperties.get(property) - if (!(streamDefault == null || streamDefault.isEmpty)) { - Option(streamDefault) - } else { - getNonEmptyOption((SystemConfig.SYSTEM_PREFIX + property) format name) - } - } -} diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 70ff87d..d2b8f8f 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -37,7 +37,6 @@ import org.apache.samza.config.MetricsConfig.Config2Metrics import org.apache.samza.config.SerializerConfig.Config2Serializer import org.apache.samza.config.StorageConfig.Config2Storage import org.apache.samza.config.StreamConfig.Config2Stream -import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.config.TaskConfig.Config2Task import org.apache.samza.config._ import org.apache.samza.container.disk.DiskSpaceMonitor.Listener @@ -50,13 +49,12 @@ import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, Metr import org.apache.samza.serializers._ import org.apache.samza.serializers.model.SamzaObjectMapper import org.apache.samza.startpoint.StartpointManager -import org.apache.samza.storage.StorageEngineFactory.StoreMode import org.apache.samza.storage._ import org.apache.samza.system._ import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, RoundRobinChooserFactory} import org.apache.samza.table.TableManager -import org.apache.samza.table.utils.SerdeUtils import org.apache.samza.task._ +import org.apache.samza.util.ScalaJavaUtil.JavaOptionals import org.apache.samza.util.{Util, _} import org.apache.samza.{SamzaContainerStatus, SamzaException} @@ -139,6 +137,7 @@ object SamzaContainer extends Logging { externalContextOption: Option[ExternalContext], localityManager: LocalityManager = null) = { val config = jobContext.getConfig + val systemConfig = new SystemConfig(config) val containerModel = jobModel.getContainers.get(containerId) val containerName = "samza-container-%s" format containerId val maxChangeLogStreamPartitions = jobModel.maxChangeLogStreamPartitions @@ -192,8 +191,7 @@ object SamzaContainer extends Logging { .map(_.getSystem) .toSet - - val systemNames = config.getSystemNames + val systemNames = systemConfig.getSystemNames.asScala info("Got system names: %s" format systemNames) @@ -202,8 +200,7 @@ object SamzaContainer extends Logging { info("Got serde streams: %s" format serdeStreams) val systemFactories = systemNames.map(systemName => { - val systemFactoryClassName = config - .getSystemFactory(systemName) + val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName)) (systemName, Util.getObj(systemFactoryClassName, classOf[SystemFactory])) }).toMap @@ -321,11 +318,13 @@ object SamzaContainer extends Logging { }).toMap } - val systemKeySerdes = buildSystemSerdeMap(systemName => config.getSystemKeySerde(systemName)) + val systemKeySerdes = buildSystemSerdeMap(systemName => + JavaOptionals.toRichOptional(systemConfig.getSystemKeySerde(systemName)).toOption) debug("Got system key serdes: %s" format systemKeySerdes) - val systemMessageSerdes = buildSystemSerdeMap(systemName => config.getSystemMsgSerde(systemName)) + val systemMessageSerdes = buildSystemSerdeMap(systemName => + JavaOptionals.toRichOptional(systemConfig.getSystemMsgSerde(systemName)).toOption) debug("Got system message serdes: %s" format systemMessageSerdes) diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index c59e35e..fa9d2a7 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -23,14 +23,8 @@ import java.util import java.util.concurrent.atomic.AtomicReference import org.apache.samza.Partition -import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.config.SystemConfig.Config2System -import org.apache.samza.config.TaskConfig.Config2Task -import org.apache.samza.config.{Config, _} -import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory import org.apache.samza.config._ import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.config.TaskConfig.Config2Task import org.apache.samza.config.Config import org.apache.samza.container.grouper.stream.{SSPGrouperProxy, SystemStreamPartitionGrouperFactory} @@ -381,8 +375,6 @@ object JobModelManager extends Logging { var containerMap = containerModels.asScala.map(containerModel => containerModel.getId -> containerModel).toMap new JobModel(config, containerMap.asJava) } - - private def getSystemNames(config: Config) = config.getSystemNames().toSet } /** diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala index 8a9c021..132a903 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala @@ -21,10 +21,9 @@ package org.apache.samza.metrics.reporter import org.apache.samza.util.{Logging, StreamUtil, Util} import org.apache.samza.SamzaException -import org.apache.samza.config.{ApplicationConfig, Config} +import org.apache.samza.config.{ApplicationConfig, Config, SystemConfig} import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.MetricsConfig.Config2Metrics -import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.config.StreamConfig.Config2Stream import org.apache.samza.config.SerializerConfig.Config2Serializer import org.apache.samza.config.TaskConfig.Config2Task @@ -33,6 +32,7 @@ import org.apache.samza.metrics.MetricsReporterFactory import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.serializers.{MetricsSnapshotSerdeV2, SerdeFactory} import org.apache.samza.system.SystemFactory +import org.apache.samza.util.ScalaJavaUtil.JavaOptionals class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging { def getMetricsReporter(name: String, containerName: String, config: Config): MetricsReporter = { @@ -72,8 +72,8 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging val systemName = systemStream.getSystem - val systemFactoryClassName = config - .getSystemFactory(systemName) + val systemConfig = new SystemConfig(config) + val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption .getOrElse(throw new SamzaException("Trying to fetch system factory for system %s, which isn't defined in config." format systemName)) val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory]) @@ -87,7 +87,7 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging info("Got producer %s." format producer) val streamSerdeName = config.getStreamMsgSerde(systemStream) - val systemSerdeName = config.getSystemMsgSerde(systemName) + val systemSerdeName = JavaOptionals.toRichOptional(systemConfig.getSystemMsgSerde(systemName)).toOption val serdeName = streamSerdeName.getOrElse(systemSerdeName.getOrElse(null)) val serde = if (serdeName != null) { config.getSerdeClass(serdeName) match { diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala index bfb2271..0d767c6 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala @@ -22,10 +22,10 @@ package org.apache.samza.util import org.apache.samza.SamzaException -import org.apache.samza.config.{Config, ConfigException, JobConfig, MapConfig, SystemConfig} +import org.apache.samza.config._ import org.apache.samza.system.{SystemFactory, SystemStream} import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.config.SystemConfig.Config2System +import org.apache.samza.util.ScalaJavaUtil.JavaOptionals import scala.collection.immutable.Map import scala.collection.JavaConverters._ @@ -38,7 +38,7 @@ object CoordinatorStreamUtil { def buildCoordinatorStreamConfig(config: Config) = { val (jobName, jobId) = getJobNameAndId(config) // Build a map with just the system config and job.name/job.id. This is what's required to start the JobCoordinator. - val map = config.subset(SystemConfig.SYSTEM_PREFIX format config.getCoordinatorSystemName, false).asScala ++ + val map = config.subset(SystemConfig.SYSTEM_ID_PREFIX format config.getCoordinatorSystemName, false).asScala ++ Map[String, String]( JobConfig.JOB_NAME -> jobName, JobConfig.JOB_ID -> jobId, @@ -66,9 +66,9 @@ object CoordinatorStreamUtil { */ def getCoordinatorSystemFactory(config: Config) = { val systemName = config.getCoordinatorSystemName - val systemFactoryClassName = config - .getSystemFactory(systemName) - .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName)) + val systemConfig = new SystemConfig(config) + val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption + .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY_FORMAT format systemName)) Util.getObj(systemFactoryClassName, classOf[SystemFactory]) } diff --git a/samza-core/src/test/java/org/apache/samza/config/TestFileSystemCheckpointManagerConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestFileSystemCheckpointManagerConfig.java new file mode 100644 index 0000000..35ab060 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/config/TestFileSystemCheckpointManagerConfig.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.config; + +import com.google.common.collect.ImmutableMap; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + + +public class TestFileSystemCheckpointManagerConfig { + @Test + public void testGetFileSystemCheckpointRoot() { + String checkpointManagerRoot = "checkpointManagerRoot"; + + // checkpoint path exists + Config config = new MapConfig(ImmutableMap.of("task.checkpoint.path", checkpointManagerRoot)); + FileSystemCheckpointManagerConfig fileSystemCheckpointManagerConfig = new FileSystemCheckpointManagerConfig(config); + assertEquals(checkpointManagerRoot, fileSystemCheckpointManagerConfig.getFileSystemCheckpointRoot().get()); + + // checkpoint path does not exist + config = new MapConfig(); + fileSystemCheckpointManagerConfig = new FileSystemCheckpointManagerConfig(config); + assertFalse(fileSystemCheckpointManagerConfig.getFileSystemCheckpointRoot().isPresent()); + } +} diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java deleted file mode 100644 index 94ba374..0000000 --- a/samza-core/src/test/java/org/apache/samza/config/TestJavaSystemConfig.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.config; - -import static org.junit.Assert.*; - -import java.util.HashMap; -import java.util.Map; - -import org.junit.Test; - -public class TestJavaSystemConfig { - private static final String MOCK_SYSTEM_NAME1 = "mocksystem1"; - private static final String MOCK_SYSTEM_NAME2 = "mocksystem2"; - private static final String MOCK_SYSTEM_FACTORY_NAME1 = String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME1); - private static final String MOCK_SYSTEM_FACTORY_NAME2 = String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME2); - private static final String MOCK_SYSTEM_FACTORY_CLASSNAME1 = "some.factory.Class1"; - private static final String MOCK_SYSTEM_FACTORY_CLASSNAME2 = "some.factory.Class2"; - - @Test - public void testClassName() { - Map<String, String> map = new HashMap<String, String>(); - map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1); - JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map)); - - assertEquals(MOCK_SYSTEM_FACTORY_CLASSNAME1, systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1)); - } - - @Test - public void testGetEmptyClassNameAsNull() { - Map<String, String> map = new HashMap<String, String>(); - map.put(MOCK_SYSTEM_FACTORY_NAME1, ""); - map.put(MOCK_SYSTEM_FACTORY_NAME2, " "); - JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map)); - - assertNull(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1)); - assertNull(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME2)); - } - - @Test - public void testGetSystemNames() { - Map<String, String> map = new HashMap<String, String>(); - map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1); - map.put(MOCK_SYSTEM_FACTORY_NAME2, MOCK_SYSTEM_FACTORY_CLASSNAME2); - JavaSystemConfig systemConfig = new JavaSystemConfig(new MapConfig(map)); - - assertEquals(2, systemConfig.getSystemNames().size()); - assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME1)); - assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME2)); - } -} diff --git a/samza-core/src/test/java/org/apache/samza/config/TestSystemConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestSystemConfig.java new file mode 100644 index 0000000..5512bbc --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/config/TestSystemConfig.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config; + +import com.google.common.collect.ImmutableMap; +import org.apache.samza.metrics.MetricsRegistry; +import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemProducer; +import org.junit.Test; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +public class TestSystemConfig { + private static final String MOCK_SYSTEM_NAME1 = "mocksystem1"; + private static final String MOCK_SYSTEM_NAME2 = "mocksystem2"; + private static final String MOCK_SYSTEM_FACTORY_NAME1 = + String.format(SystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME1); + private static final String MOCK_SYSTEM_FACTORY_NAME2 = + String.format(SystemConfig.SYSTEM_FACTORY_FORMAT, MOCK_SYSTEM_NAME2); + private static final String MOCK_SYSTEM_FACTORY_CLASSNAME1 = "some.factory.Class1"; + private static final String MOCK_SYSTEM_FACTORY_CLASSNAME2 = "some.factory.Class2"; + + private static final String SAMZA_OFFSET_DEFAULT = "samza.offset.default"; + + /** + * Placeholder to help make sure the correct {@link SystemAdmin} is returned by {@link MockSystemFactory}. Do not mock + * any methods of this mock. + */ + private static final SystemAdmin SYSTEM_ADMIN1 = mock(SystemAdmin.class); + /** + * Placeholder to help make sure the correct {@link SystemAdmin} is returned by {@link MockSystemFactory}. Do not mock + * any methods of this mock. + */ + private static final SystemAdmin SYSTEM_ADMIN2 = mock(SystemAdmin.class); + + @Test + public void testGetSystemFactory() { + Map<String, String> map = new HashMap<>(); + map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1); + SystemConfig systemConfig = new SystemConfig(new MapConfig(map)); + + assertEquals(MOCK_SYSTEM_FACTORY_CLASSNAME1, systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1).get()); + } + + @Test + public void testGetSystemFactoryEmptyClassName() { + Map<String, String> map = new HashMap<>(); + map.put(MOCK_SYSTEM_FACTORY_NAME1, ""); + map.put(MOCK_SYSTEM_FACTORY_NAME2, " "); + SystemConfig systemConfig = new SystemConfig(new MapConfig(map)); + + assertFalse(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME1).isPresent()); + assertFalse(systemConfig.getSystemFactory(MOCK_SYSTEM_NAME2).isPresent()); + } + + @Test + public void testGetSystemNames() { + Map<String, String> map = new HashMap<>(); + map.put(MOCK_SYSTEM_FACTORY_NAME1, MOCK_SYSTEM_FACTORY_CLASSNAME1); + map.put(MOCK_SYSTEM_FACTORY_NAME2, MOCK_SYSTEM_FACTORY_CLASSNAME2); + SystemConfig systemConfig = new SystemConfig(new MapConfig(map)); + + assertEquals(2, systemConfig.getSystemNames().size()); + assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME1)); + assertTrue(systemConfig.getSystemNames().contains(MOCK_SYSTEM_NAME2)); + } + + @Test + public void testGetSystemAdmins() { + Map<String, String> map = ImmutableMap.of(MOCK_SYSTEM_FACTORY_NAME1, MockSystemFactory.class.getName()); + SystemConfig systemConfig = new SystemConfig(new MapConfig(map)); + Map<String, SystemAdmin> expected = ImmutableMap.of(MOCK_SYSTEM_NAME1, SYSTEM_ADMIN1); + assertEquals(expected, systemConfig.getSystemAdmins()); + } + + @Test + public void testGetSystemAdmin() { + Map<String, String> map = ImmutableMap.of(MOCK_SYSTEM_FACTORY_NAME1, MockSystemFactory.class.getName()); + SystemConfig systemConfig = new SystemConfig(new MapConfig(map)); + assertEquals(SYSTEM_ADMIN1, systemConfig.getSystemAdmin(MOCK_SYSTEM_NAME1)); + assertNull(systemConfig.getSystemAdmin(MOCK_SYSTEM_NAME2)); + } + + @Test + public void testGetSystemFactories() { + Map<String, String> map = ImmutableMap.of(MOCK_SYSTEM_FACTORY_NAME1, MockSystemFactory.class.getName()); + SystemConfig systemConfig = new SystemConfig(new MapConfig(map)); + Map<String, SystemFactory> actual = systemConfig.getSystemFactories(); + assertEquals(actual.size(), 1); + assertTrue(actual.get(MOCK_SYSTEM_NAME1) instanceof MockSystemFactory); + } + + @Test + public void testGetDefaultStreamProperties() { + String defaultStreamPrefix = buildDefaultStreamPropertiesPrefix(MOCK_SYSTEM_NAME1); + String system1ConfigKey = "config1-key"; + String system1ConfigValue = "config1-value"; + String system2ConfigKey = "config2-key"; + String system2ConfigValue = "config2-value"; + + Config config = new MapConfig(ImmutableMap.of(defaultStreamPrefix + system1ConfigKey, system1ConfigValue, + defaultStreamPrefix + system2ConfigKey, system2ConfigValue)); + Config expected = + new MapConfig(ImmutableMap.of(system1ConfigKey, system1ConfigValue, system2ConfigKey, system2ConfigValue)); + SystemConfig systemConfig = new SystemConfig(config); + assertEquals(expected, systemConfig.getDefaultStreamProperties(MOCK_SYSTEM_NAME1)); + assertEquals(new MapConfig(), systemConfig.getDefaultStreamProperties(MOCK_SYSTEM_NAME2)); + } + + @Test + public void testGetSystemOffsetDefault() { + String system1OffsetDefault = "offset-system-1"; + String system2OffsetDefault = "offset-system-2"; + Config config = new MapConfig(ImmutableMap.of( + // default.stream.samza.offset.default set + String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, MOCK_SYSTEM_NAME1) + SAMZA_OFFSET_DEFAULT, + system1OffsetDefault, + // should not use this value since default.stream.samza.offset.default is set + String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + SAMZA_OFFSET_DEFAULT, "wrong-value", + // only samza.offset.default set + String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME2) + SAMZA_OFFSET_DEFAULT, system2OffsetDefault)); + SystemConfig systemConfig = new SystemConfig(config); + assertEquals(system1OffsetDefault, systemConfig.getSystemOffsetDefault(MOCK_SYSTEM_NAME1)); + assertEquals(system2OffsetDefault, systemConfig.getSystemOffsetDefault(MOCK_SYSTEM_NAME2)); + assertEquals(SystemConfig.SAMZA_SYSTEM_OFFSET_UPCOMING, systemConfig.getSystemOffsetDefault("other-system")); + } + + @Test + public void testGetSystemKeySerde() { + String system1KeySerde = "system1-key-serde"; + String defaultStreamPrefixSystem1 = buildDefaultStreamPropertiesPrefix(MOCK_SYSTEM_NAME1); + + // value specified explicitly + Config config = + new MapConfig(ImmutableMap.of(defaultStreamPrefixSystem1 + StreamConfig.KEY_SERDE(), system1KeySerde)); + SystemConfig systemConfig = new SystemConfig(config); + assertEquals(system1KeySerde, systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).get()); + + // default stream property is unspecified, try fall back config key + config = new MapConfig( + ImmutableMap.of(String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.KEY_SERDE(), + system1KeySerde)); + systemConfig = new SystemConfig(config); + assertEquals(system1KeySerde, systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).get()); + + // default stream property is empty string, try fall back config key + config = new MapConfig(ImmutableMap.of( + // default stream property is empty + defaultStreamPrefixSystem1 + StreamConfig.KEY_SERDE(), "", + // fall back entry + String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.KEY_SERDE(), system1KeySerde)); + systemConfig = new SystemConfig(config); + assertEquals(system1KeySerde, systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).get()); + + // default stream property is unspecified, fall back is also empty + config = new MapConfig( + ImmutableMap.of(String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.KEY_SERDE(), + "")); + systemConfig = new SystemConfig(config); + assertFalse(systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).isPresent()); + + // default stream property is unspecified, fall back is also unspecified + config = new MapConfig(); + systemConfig = new SystemConfig(config); + assertFalse(systemConfig.getSystemKeySerde(MOCK_SYSTEM_NAME1).isPresent()); + } + + @Test + public void testGetSystemMsgSerde() { + String system1MsgSerde = "system1-msg-serde"; + String defaultStreamPrefixSystem1 = buildDefaultStreamPropertiesPrefix(MOCK_SYSTEM_NAME1); + + // value specified explicitly + Config config = + new MapConfig(ImmutableMap.of(defaultStreamPrefixSystem1 + StreamConfig.MSG_SERDE(), system1MsgSerde)); + SystemConfig systemConfig = new SystemConfig(config); + assertEquals(system1MsgSerde, systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).get()); + + // default stream property is unspecified, try fall back config msg + config = new MapConfig( + ImmutableMap.of(String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.MSG_SERDE(), + system1MsgSerde)); + systemConfig = new SystemConfig(config); + assertEquals(system1MsgSerde, systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).get()); + + // default stream property is empty string, try fall back config msg + config = new MapConfig(ImmutableMap.of( + // default stream property is empty + defaultStreamPrefixSystem1 + StreamConfig.MSG_SERDE(), "", + // fall back entry + String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.MSG_SERDE(), system1MsgSerde)); + systemConfig = new SystemConfig(config); + assertEquals(system1MsgSerde, systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).get()); + + // default stream property is unspecified, fall back is also empty + config = new MapConfig( + ImmutableMap.of(String.format(SystemConfig.SYSTEM_ID_PREFIX, MOCK_SYSTEM_NAME1) + StreamConfig.MSG_SERDE(), + "")); + systemConfig = new SystemConfig(config); + assertFalse(systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).isPresent()); + + // default stream property is unspecified, fall back is also unspecified + config = new MapConfig(); + systemConfig = new SystemConfig(config); + assertFalse(systemConfig.getSystemMsgSerde(MOCK_SYSTEM_NAME1).isPresent()); + } + + @Test + public void testDeleteCommittedMessages() { + Config config = new MapConfig(ImmutableMap.of( + // value is "true" + String.format(SystemConfig.DELETE_COMMITTED_MESSAGES, MOCK_SYSTEM_NAME1), "true", + // value is explicitly "false" + String.format(SystemConfig.DELETE_COMMITTED_MESSAGES, MOCK_SYSTEM_NAME2), "false")); + SystemConfig systemConfig = new SystemConfig(config); + assertTrue(systemConfig.deleteCommittedMessages(MOCK_SYSTEM_NAME1)); + assertFalse(systemConfig.deleteCommittedMessages(MOCK_SYSTEM_NAME2)); + assertFalse(systemConfig.deleteCommittedMessages("other-system")); // value is not specified + } + + public static class MockSystemFactory implements SystemFactory { + @Override + public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) { + throw new UnsupportedOperationException("Unnecessary for test"); + } + + @Override + public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) { + throw new UnsupportedOperationException("Unnecessary for test"); + } + + @Override + public SystemAdmin getAdmin(String systemName, Config config) { + switch (systemName) { + case MOCK_SYSTEM_NAME1: + return SYSTEM_ADMIN1; + case MOCK_SYSTEM_NAME2: + return SYSTEM_ADMIN2; + default: + throw new UnsupportedOperationException("System name unsupported: " + systemName); + } + } + } + + private static String buildDefaultStreamPropertiesPrefix(String systemName) { + return String.format(SystemConfig.SYSTEM_DEFAULT_STREAMS_PREFIX_FORMAT, systemName); + } +} diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala index 244fd8f..93c7096 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala @@ -84,8 +84,8 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar { JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator", TaskConfig.INPUT_STREAMS -> "test.foo", TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getName, - SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName, - SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName, + SystemConfig.SYSTEM_FACTORY_FORMAT.format("test") -> classOf[MockSystemFactory].getName, + SystemConfig.SYSTEM_FACTORY_FORMAT.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName, TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory" ).asJava) config = JobPlanner.generateSingleJobConfig(userDefinedConfig) @@ -151,8 +151,8 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar { JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator", TaskConfig.INPUT_STREAMS -> "test.foo", TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getName, - SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName, - SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName, + SystemConfig.SYSTEM_FACTORY_FORMAT.format("test") -> classOf[MockSystemFactory].getName, + SystemConfig.SYSTEM_FACTORY_FORMAT.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName, TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory" ).asJava) val generatedConfigs: MapConfig = JobPlanner.generateSingleJobConfig(userDefinedConfig) diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala b/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala deleted file mode 100644 index cc54d00..0000000 --- a/samza-core/src/test/scala/org/apache/samza/config/TestSystemConfig.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.config - -import scala.collection.JavaConverters._ -import org.apache.samza.config.SystemConfig.{Config2System, SYSTEM_FACTORY} -import org.junit.Assert._ -import org.junit.Test - -class TestSystemConfig { - val MOCK_SYSTEM_NAME1 = "mocksystem1" - val MOCK_SYSTEM_NAME2 = "mocksystem2" - val MOCK_SYSTEM_FACTORY_NAME1 = SYSTEM_FACTORY.format(MOCK_SYSTEM_NAME1) - val MOCK_SYSTEM_FACTORY_NAME2 = SYSTEM_FACTORY.format(MOCK_SYSTEM_NAME2) - val MOCK_SYSTEM_FACTORY_CLASSNAME1 = "some.factory.Class1" - val MOCK_SYSTEM_FACTORY_CLASSNAME2 = "some.factory.Class2" - - def testClassName { - val configMap = Map[String, String]( - MOCK_SYSTEM_FACTORY_NAME1 -> MOCK_SYSTEM_FACTORY_CLASSNAME1 - ) - val config = new MapConfig(configMap.asJava) - - assertEquals(MOCK_SYSTEM_FACTORY_CLASSNAME1, config.getSystemFactory(MOCK_SYSTEM_NAME1).getOrElse("")) - } - - @Test - def testGetEmptyClassNameAsNull { - val configMap = Map[String, String]( - MOCK_SYSTEM_FACTORY_NAME1 -> "", - MOCK_SYSTEM_FACTORY_NAME1 -> " " - ) - val config = new MapConfig(configMap.asJava) - - assertEquals(config.getSystemFactory(MOCK_SYSTEM_NAME1), None) - assertEquals(config.getSystemFactory(MOCK_SYSTEM_NAME2), None) - } - - def testGetSystemNames { - val configMap = Map[String, String]( - MOCK_SYSTEM_FACTORY_NAME1 -> MOCK_SYSTEM_FACTORY_CLASSNAME1, - MOCK_SYSTEM_FACTORY_NAME2 -> MOCK_SYSTEM_FACTORY_CLASSNAME2 - ) - val config = new MapConfig(configMap.asJava) - val systemNames = config.getSystemNames() - - assertTrue(systemNames.contains(MOCK_SYSTEM_NAME1)) - assertTrue(systemNames.contains(MOCK_SYSTEM_NAME2)) - } -} diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala index b7a9bec..5b43840 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala @@ -23,7 +23,7 @@ import java.util import org.apache.samza.Partition import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory -import org.apache.samza.config.{JobConfig, MapConfig, SystemConfig, TaskConfig} +import org.apache.samza.config._ import org.apache.samza.container.{SamzaContainer, TaskName} import org.apache.samza.coordinator.stream.{CoordinatorStreamManager, MockCoordinatorStreamSystemFactory, MockCoordinatorStreamWrappedConsumer} import org.apache.samza.job.MockJobFactory @@ -91,8 +91,8 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester { JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator", JobConfig.JOB_CONTAINER_COUNT -> "2", TaskConfig.INPUT_STREAMS -> "test.stream1", - SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName, - SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName, + SystemConfig.SYSTEM_FACTORY_FORMAT.format("test") -> classOf[MockSystemFactory].getCanonicalName, + SystemConfig.SYSTEM_FACTORY_FORMAT.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName, TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory", JobConfig.MONITOR_PARTITION_CHANGE -> "true", JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS -> "100" @@ -153,8 +153,8 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester { JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator", JobConfig.JOB_CONTAINER_COUNT -> "2", TaskConfig.INPUT_STREAMS -> "test.stream1", - SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName, - SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName, + SystemConfig.SYSTEM_FACTORY_FORMAT.format("test") -> classOf[MockSystemFactory].getCanonicalName, + SystemConfig.SYSTEM_FACTORY_FORMAT.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName, TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory" ) @@ -230,7 +230,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester { */ @Test def testWithPartitionAssignmentWithMockJobFactory { - val config = new SystemConfig(getTestConfig(classOf[MockJobFactory])) + val config = getTestConfig(classOf[MockJobFactory]) val systemStream = new SystemStream("test", "stream1") val streamMetadataCache = mock(classOf[StreamMetadataCache]) when(streamMetadataCache.getStreamMetadata(Set(systemStream), true)).thenReturn( @@ -256,8 +256,8 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester { JobConfig.STREAM_JOB_FACTORY_CLASS -> clazz.getCanonicalName, JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_REGEX, JobConfig.SSP_MATCHER_CONFIG_REGEX -> "[1]", - SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName, - SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName).asJava) + SystemConfig.SYSTEM_FACTORY_FORMAT.format("test") -> classOf[MockSystemFactory].getCanonicalName, + SystemConfig.SYSTEM_FACTORY_FORMAT.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName).asJava) config } diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala b/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala index 3d385d6..e9e372d 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala @@ -52,7 +52,7 @@ class TestRangeSystemStreamPartitionMatcher { JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName, JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_RANGE, JobConfig.SSP_MATCHER_CONFIG_RANGES -> range, - (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava) + (SystemConfig.SYSTEM_FACTORY_FORMAT format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava) } @Test @@ -99,7 +99,7 @@ class TestRangeSystemStreamPartitionMatcher { TaskConfig.INPUT_STREAMS -> "test.stream1", JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName, JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_RANGE, - (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava) + (SystemConfig.SYSTEM_FACTORY_FORMAT format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava) new RangeSystemStreamPartitionMatcher().filter(sspSet.asJava, config) } diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala b/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala index 255c85e..cb3b1e0 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala @@ -52,7 +52,7 @@ class TestRegexSystemStreamPartitionMatcher { JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName, JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_REGEX, JobConfig.SSP_MATCHER_CONFIG_REGEX -> regex, - (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava) + (SystemConfig.SYSTEM_FACTORY_FORMAT format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava) } @Test @@ -70,7 +70,7 @@ class TestRegexSystemStreamPartitionMatcher { TaskConfig.INPUT_STREAMS -> "test.stream1", JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName, JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CONFIG_REGEX, - (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava) + (SystemConfig.SYSTEM_FACTORY_FORMAT format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava) new RegexSystemStreamPartitionMatcher().filter(sspSet.asJava, config) } diff --git a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java index dea60b3..0b59871 100644 --- a/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java +++ b/samza-kafka/src/main/java/org/apache/samza/config/KafkaConsumerConfig.java @@ -82,7 +82,7 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // check if samza default offset value is defined - String systemOffsetDefault = new JavaSystemConfig(config).getSystemOffsetDefault(systemName); + String systemOffsetDefault = new SystemConfig(config).getSystemOffsetDefault(systemName); // Translate samza config value to kafka config value String autoOffsetReset = getAutoOffsetResetValue((String) consumerProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), systemOffsetDefault); @@ -214,10 +214,10 @@ public class KafkaConsumerConfig extends HashMap<String, Object> { String newAutoOffsetReset = KAFKA_OFFSET_LATEST; if (!StringUtils.isBlank(samzaOffsetDefault)) { switch (samzaOffsetDefault) { - case JavaSystemConfig.SAMZA_SYSTEM_OFFSET_UPCOMING: + case SystemConfig.SAMZA_SYSTEM_OFFSET_UPCOMING: newAutoOffsetReset = KAFKA_OFFSET_LATEST; break; - case JavaSystemConfig.SAMZA_SYSTEM_OFFSET_OLDEST: + case SystemConfig.SAMZA_SYSTEM_OFFSET_OLDEST: newAutoOffsetReset = KAFKA_OFFSET_EARLIEST; break; default: diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java index f4db408..0ffd1a7 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java @@ -48,9 +48,9 @@ import org.apache.kafka.common.errors.TopicExistsException; import org.apache.samza.Partition; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; +import org.apache.samza.config.SystemConfig; import org.apache.samza.config.KafkaConfig; import org.apache.samza.config.MapConfig; -import org.apache.samza.config.SystemConfig; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.StreamValidationException; import org.apache.samza.system.SystemAdmin; diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala index 2999800..efbe780 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -26,6 +26,7 @@ import org.apache.samza.config._ import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.system.{StreamSpec, SystemFactory} import org.apache.samza.system.kafka.KafkaStreamSpec +import org.apache.samza.util.ScalaJavaUtil.JavaOptionals import org.apache.samza.util.{KafkaUtil, Logging, Util, _} class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging { @@ -38,9 +39,10 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin val checkpointSystemName = kafkaConfig.getCheckpointSystem.getOrElse( throw new SamzaException("No system defined for Kafka's checkpoint manager.")) - val checkpointSystemFactoryName = new SystemConfig(config) - .getSystemFactory(checkpointSystemName) - .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format checkpointSystemName)) + val systemConfig = new SystemConfig(config) + val checkpointSystemFactoryName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(checkpointSystemName)) + .toOption + .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY_FORMAT format checkpointSystemName)) val checkpointSystemFactory = Util.getObj(checkpointSystemFactoryName, classOf[SystemFactory]) val checkpointTopic = KafkaUtil.getCheckpointTopic(jobName, jobId, config) diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 607feb0..10f879c 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -23,15 +23,13 @@ package org.apache.samza.config import java.util import java.util.concurrent.TimeUnit import java.util.regex.Pattern -import java.util.{Properties, UUID} +import java.util.Properties import com.google.common.collect.ImmutableMap -import kafka.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.samza.SamzaException import org.apache.samza.config.ApplicationConfig.ApplicationMode -import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.util.{Logging, StreamUtil} import scala.collection.JavaConverters._ @@ -68,7 +66,7 @@ object KafkaConfig { * Defines how low a queue can get for a single system/stream/partition * combination before trying to fetch more messages for it. */ - val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold" + val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_ID_PREFIX + "samza.fetch.threshold" val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400 @@ -79,7 +77,7 @@ object KafkaConfig { * the bytes limit + size of max message in the partition for a given stream. * If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config. */ - val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes" + val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_ID_PREFIX + "samza.fetch.threshold.bytes" val DEFAULT_RETENTION_MS_FOR_BATCH = TimeUnit.DAYS.toMillis(1) @@ -113,7 +111,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { } private def getSystemDefaultReplicationFactor(systemName: String, defaultValue: String) = { - val defaultReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue) + val defaultReplicationFactor = new SystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue) defaultReplicationFactor } @@ -127,7 +125,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { * Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]] */ def getCheckpointSegmentBytes() = { - val defaultsegBytes = new JavaSystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES) + val defaultsegBytes = new SystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES) getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, defaultsegBytes) } @@ -144,7 +142,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { case Some(rplFactor) => rplFactor case _ => val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull - val systemReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3") + val systemReplicationFactor = new SystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3") systemReplicationFactor } @@ -161,7 +159,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { case Some(segBytes) => segBytes case _ => val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull - val segBytes = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400") + val segBytes = new SystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400") segBytes } @@ -240,7 +238,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { storageConfig.getChangelogStream(storeName).foreach(changelogName => { val systemStream = StreamUtil.getSystemStreamFromNames(changelogName) - val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem)) storeToChangelog += storeName -> systemStream.getStream }) } diff --git a/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala index 02a6275..9b5cb31 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config_deprecated/KafkaConfig.scala @@ -32,7 +32,6 @@ import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.samza.SamzaException import org.apache.samza.config.ApplicationConfig.ApplicationMode import org.apache.samza.config._ -import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.util.{Logging, StreamUtil} import scala.collection.JavaConverters._ @@ -72,7 +71,7 @@ object KafkaConfig { * Defines how low a queue can get for a single system/stream/partition * combination before trying to fetch more messages for it. */ - val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold" + val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_ID_PREFIX + "samza.fetch.threshold" val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400 @@ -83,7 +82,7 @@ object KafkaConfig { * the bytes limit + size of max message in the partition for a given stream. * If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config. */ - val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes" + val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_ID_PREFIX + "samza.fetch.threshold.bytes" val DEFAULT_RETENTION_MS_FOR_BATCH = TimeUnit.DAYS.toMillis(1) @@ -117,7 +116,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { } private def getSystemDefaultReplicationFactor(systemName: String, defaultValue: String) = { - val defaultReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue) + val defaultReplicationFactor = new SystemConfig(config).getDefaultStreamProperties(systemName).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, defaultValue) defaultReplicationFactor } @@ -131,7 +130,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { * Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]] */ def getCheckpointSegmentBytes() = { - val defaultsegBytes = new JavaSystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES) + val defaultsegBytes = new SystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES) getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, defaultsegBytes) } @@ -148,7 +147,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { case Some(rplFactor) => rplFactor case _ => val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull - val systemReplicationFactor = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3") + val systemReplicationFactor = new SystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.TOPIC_REPLICATION_FACTOR, "3") systemReplicationFactor } @@ -165,7 +164,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { case Some(segBytes) => segBytes case _ => val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull - val segBytes = new JavaSystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400") + val segBytes = new SystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.SEGMENT_BYTES, "26214400") segBytes } @@ -244,7 +243,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { storageConfig.getChangelogStream(storeName).foreach(changelogName => { val systemStream = StreamUtil.getSystemStreamFromNames(changelogName) - val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem)) storeToChangelog += storeName -> systemStream.getStream }) } diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala index 7e8509d..b4003d8 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala @@ -33,7 +33,6 @@ import org.apache.samza.system.SystemFactory import org.apache.samza.config.StorageConfig._ import org.apache.samza.system.SystemProducer import org.apache.samza.system.SystemAdmin -import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.system.SystemConsumer object KafkaSystemFactory extends Logging { @@ -129,7 +128,8 @@ class KafkaSystemFactory extends SystemFactory with Logging { (topicName, changelogInfo) }} - val deleteCommittedMessages = config.deleteCommittedMessages(systemName) + val systemConfig = new SystemConfig(config) + val deleteCommittedMessages = systemConfig.deleteCommittedMessages(systemName) val intermediateStreamProperties: Map[String, Properties] = getIntermediateStreamProperties(config) new KafkaSystemAdmin( systemName, diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index 0a0aae8..dbe82fc 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -26,9 +26,7 @@ import kafka.integration.KafkaServerTestHarness import kafka.server.ConfigType import kafka.utils.{CoreUtils, TestUtils, ZkUtils} import com.google.common.collect.ImmutableMap -import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.security.JaasUtils -import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.samza.checkpoint.Checkpoint import org.apache.samza.config._ import org.apache.samza.container.TaskName @@ -37,6 +35,7 @@ import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.serializers.CheckpointSerde import org.apache.samza.system._ import org.apache.samza.system.kafka.{KafkaStreamSpec, KafkaSystemFactory} +import org.apache.samza.util.ScalaJavaUtil.JavaOptionals import org.apache.samza.util.{KafkaUtilException, NoOpMetricsRegistry, Util} import org.apache.samza.{Partition, SamzaException} import org.junit.Assert._ @@ -209,9 +208,9 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { val systemName = kafkaConfig.getCheckpointSystem.getOrElse( throw new SamzaException("No system defined for Kafka's checkpoint manager.")) - val systemFactoryClassName = new SystemConfig(config) - .getSystemFactory(systemName) - .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName)) + val systemConfig = new SystemConfig(config) + val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption + .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY_FORMAT format systemName)) val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory]) diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java index 5824489..12c03f6 100644 --- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java +++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java @@ -26,7 +26,7 @@ import org.apache.samza.system.SystemStream; * This class contains the methods for getting properties that are needed by the * StreamAppender. */ -public class Log4jSystemConfig extends JavaSystemConfig { +public class Log4jSystemConfig extends SystemConfig { private static final String LOCATION_ENABLED = "task.log4j.location.info.enabled"; private static final String TASK_LOG4J_SYSTEM = "task.log4j.system"; diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java index b4a97f7..19e6ea6 100644 --- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java +++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java @@ -286,7 +286,6 @@ public class StreamAppender extends AppenderSkeleton { protected void setupSystem() { config = getConfig(); - SystemFactory systemFactory = null; Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config); if (streamName == null) { @@ -298,12 +297,10 @@ public class StreamAppender extends AppenderSkeleton { metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry); String systemName = log4jSystemConfig.getSystemName(); - String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName); - if (systemFactoryName != null) { - systemFactory = Util.getObj(systemFactoryName, SystemFactory.class); - } else { - throw new SamzaException("Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use"); - } + String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName) + .orElseThrow(() -> new SamzaException( + "Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use")); + SystemFactory systemFactory = Util.getObj(systemFactoryName, SystemFactory.class); setSerde(log4jSystemConfig, systemName, streamName); diff --git a/samza-log4j2/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j2/src/main/java/org/apache/samza/config/Log4jSystemConfig.java index 5824489..12c03f6 100644 --- a/samza-log4j2/src/main/java/org/apache/samza/config/Log4jSystemConfig.java +++ b/samza-log4j2/src/main/java/org/apache/samza/config/Log4jSystemConfig.java @@ -26,7 +26,7 @@ import org.apache.samza.system.SystemStream; * This class contains the methods for getting properties that are needed by the * StreamAppender. */ -public class Log4jSystemConfig extends JavaSystemConfig { +public class Log4jSystemConfig extends SystemConfig { private static final String LOCATION_ENABLED = "task.log4j.location.info.enabled"; private static final String TASK_LOG4J_SYSTEM = "task.log4j.system"; diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java index 28f759e..a224762 100644 --- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java +++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java @@ -307,7 +307,6 @@ public class StreamAppender extends AbstractAppender { protected void setupSystem() { config = getConfig(); - SystemFactory systemFactory = null; Log4jSystemConfig log4jSystemConfig = new Log4jSystemConfig(config); if (streamName == null) { @@ -319,12 +318,10 @@ public class StreamAppender extends AbstractAppender { metrics = new StreamAppenderMetrics("stream-appender", metricsRegistry); String systemName = log4jSystemConfig.getSystemName(); - String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName); - if (systemFactoryName != null) { - systemFactory = Util.getObj(systemFactoryName, SystemFactory.class); - } else { - throw new SamzaException("Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use"); - } + String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName) + .orElseThrow(() -> new SamzaException( + "Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use")); + SystemFactory systemFactory = Util.getObj(systemFactoryName, SystemFactory.class); setSerde(log4jSystemConfig, systemName, streamName); diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java index a74d0ba..da61f74 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/descriptors/InMemorySystemDescriptor.java @@ -28,7 +28,7 @@ import org.apache.samza.system.descriptors.SystemDescriptor; import org.apache.samza.serializers.Serde; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.inmemory.InMemorySystemFactory; -import org.apache.samza.config.JavaSystemConfig; +import org.apache.samza.config.SystemConfig; /** * Descriptor for an InMemorySystem. @@ -87,7 +87,7 @@ public class InMemorySystemDescriptor extends SystemDescriptor<InMemorySystemDes public Map<String, String> toConfig() { HashMap<String, String> configs = new HashMap<>(super.toConfig()); configs.put(InMemorySystemConfig.INMEMORY_SCOPE, this.inMemoryScope); - configs.put(String.format(JavaSystemConfig.SYSTEM_FACTORY_FORMAT, getSystemName()), FACTORY_CLASS_NAME); + configs.put(String.format(SystemConfig.SYSTEM_FACTORY_FORMAT, getSystemName()), FACTORY_CLASS_NAME); return configs; } diff --git a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java index cc8cceb..683ced4 100644 --- a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java +++ b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java @@ -32,10 +32,10 @@ import java.util.stream.IntStream; import org.apache.commons.cli.ParseException; import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; +import org.apache.samza.config.SystemConfig; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.MapConfig; -import org.apache.samza.config.SystemConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.system.descriptors.GenericInputDescriptor; import org.apache.samza.system.descriptors.GenericSystemDescriptor;