Repository: incubator-samza Updated Branches: refs/heads/master 377e5cc3f -> 0ebfcbd0a
SAMZA-226; auto-create changelogs Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/0ebfcbd0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/0ebfcbd0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/0ebfcbd0 Branch: refs/heads/master Commit: 0ebfcbd0ab39b2e1db4bc31e6cd38ee3113469fe Parents: 377e5cc Author: Naveen Somasundaram <[email protected]> Authored: Mon Dec 22 09:30:38 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Mon Dec 22 09:30:38 2014 -0800 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 18 +++ .../java/org/apache/samza/config/Config.java | 17 +++ .../org/apache/samza/system/SystemAdmin.java | 7 ++ ...inglePartitionWithoutOffsetsSystemAdmin.java | 6 + .../apache/samza/container/SamzaContainer.scala | 38 ++----- .../samza/storage/TaskStorageManager.scala | 40 +++++-- .../filereader/FileReaderSystemAdmin.scala | 4 + .../samza/checkpoint/TestOffsetManager.scala | 4 + .../samza/coordinator/TestJobCoordinator.scala | 2 + .../kafka/KafkaCheckpointManagerFactory.scala | 4 +- .../org/apache/samza/config/KafkaConfig.scala | 46 ++++++++ .../samza/system/kafka/KafkaSystemAdmin.scala | 114 +++++++++++++++++-- .../samza/system/kafka/KafkaSystemFactory.scala | 26 ++++- .../scala/org/apache/samza/util/KafkaUtil.scala | 4 +- .../apache/samza/config/TestKafkaConfig.scala | 18 +++ .../system/kafka/TestKafkaSystemAdmin.scala | 11 +- .../samza/system/mock/MockSystemAdmin.java | 8 +- .../test/integration/TestStatefulTask.scala | 24 ++-- .../yarn/TestSamzaAppMasterTaskManager.scala | 2 + 19 files changed, 322 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 6a5d209..4ccc0e7 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -772,6 +772,24 @@ </tr> <tr> + <td class="property" id="store-changelog-partitions">stores.<span class="store">store-name</span>.changelog.<br>replication.factor</td> + <td class="default">2</td> + <td class="description"> + The property defines the number of replicas to use for the change log stream. + </td> + </tr> + + <tr> + <td class="property" id="store-changelog-partitions">stores.<span class="store">store-name</span>.changelog.<br>topic-level-property</td> + <td class="default"></td> + <td class="description"> + The property allows you to specify topic level settings for the changelog topic to be created. + For e.g., you can specify the clean up policy as "stores.mystore.changelog.cleanup.policy=delete". + Please refer to http://kafka.apache.org/documentation.html#configuration for more topic level configurations. + </td> + </tr> + + <tr> <th colspan="3" class="section" id="regex-rewriter"> Consuming all Kafka topics matching a regular expression<br> <span class="subtitle"> http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-api/src/main/java/org/apache/samza/config/Config.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/config/Config.java b/samza-api/src/main/java/org/apache/samza/config/Config.java index 2048e90..9f7ade0 100644 --- a/samza-api/src/main/java/org/apache/samza/config/Config.java +++ b/samza-api/src/main/java/org/apache/samza/config/Config.java @@ -26,6 +26,8 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Store and retrieve named, typed values as configuration for classes implementing this interface. @@ -52,6 +54,21 @@ public abstract class Config implements Map<String, String> { return new MapConfig(out); } + public Config regexSubset(String regex) { + Map<String, String> out = new HashMap<String, String>(); + Pattern pattern = Pattern.compile(regex); + + for (Entry<String, String> entry : entrySet()) { + String k = entry.getKey(); + Matcher matcher = pattern.matcher(k); + if(matcher.find()){ + out.put(k, entry.getValue()); + } + } + + return new MapConfig(out); + } + public String get(String k, String defaultString) { if (!containsKey(k)) { return defaultString; http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java index 571c606..8995ba3 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java @@ -50,4 +50,11 @@ public interface SystemAdmin { * requested in the parameter set. */ Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames); + + /** + * An API to create a change log stream + * @param streamName The name of the stream to be created in the underlying stream + * @param numOfPartitions The number of partitions in the changelog stream + */ + void createChangelogStream(String streamName, int numOfPartitions); } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java index 38e313f..01997ae 100644 --- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import org.apache.samza.Partition; +import org.apache.samza.SamzaException; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata; @@ -54,6 +55,11 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin { } @Override + public void createChangelogStream(String streamName, int numOfPartitions){ + throw new SamzaException("Method not implemented"); + } + + @Override public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) { Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>(); http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 2f1568d..8a6d865 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -20,7 +20,6 @@ package org.apache.samza.container import java.io.File -import org.apache.samza.Partition import org.apache.samza.SamzaException import org.apache.samza.checkpoint.{ CheckpointManagerFactory, OffsetManager } import org.apache.samza.config.Config @@ -47,7 +46,6 @@ import org.apache.samza.system.SystemFactory import org.apache.samza.system.SystemProducers import org.apache.samza.system.SystemProducersMetrics import org.apache.samza.system.SystemStream -import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.chooser.DefaultChooser import org.apache.samza.system.chooser.MessageChooserFactory @@ -58,11 +56,8 @@ import org.apache.samza.util.Logging import org.apache.samza.util.Util import scala.collection.JavaConversions._ import java.net.URL -import org.apache.samza.coordinator.server.JobServlet -import org.apache.samza.job.model.ContainerModel -import org.apache.samza.coordinator.JobCoordinator +import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel} import org.apache.samza.serializers.model.SamzaObjectMapper -import org.apache.samza.job.model.JobModel import org.apache.samza.config.JobConfig.Config2Job object SamzaContainer extends Logging { @@ -263,10 +258,6 @@ object SamzaContainer extends Logging { info("Got change log system streams: %s" format changeLogSystemStreams) - val changeLogMetadata = streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet) - - info("Got change log stream metadata: %s" format changeLogMetadata) - val serdeManager = new SerdeManager( serdes = serdes, systemKeySerdes = systemKeySerdes, @@ -387,6 +378,13 @@ object SamzaContainer extends Logging { val containerContext = new SamzaContainerContext(containerId, config, taskNames) + // compute the number of partitions necessary for the change log stream creation. + // Increment by 1 because partition starts from 0, but we need the absolute count, + // this value is used for change log topic creation. + val maxChangeLogStreamPartitions = containerModel.getTasks.values + .max(Ordering.by{task:TaskModel => task.getChangelogPartition.getPartitionId}) + .getChangelogPartition.getPartitionId + 1 + val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => { debug("Setting up task instance: %s" format taskModel) @@ -440,18 +438,16 @@ object SamzaContainer extends Logging { info("Got task stores: %s" format taskStores) - val changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(taskModel.getChangelogPartition, changeLogMetadata) - - info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets)) - val storageManager = new TaskStorageManager( taskName = taskName, taskStores = taskStores, storeConsumers = storeConsumers, changeLogSystemStreams = changeLogSystemStreams, - changeLogOldestOffsets = changeLogOldestOffsets, + maxChangeLogStreamPartitions, + streamMetadataCache = streamMetadataCache, storeBaseDir = storeBaseDir, - partition = taskModel.getChangelogPartition) + partition = taskModel.getChangelogPartition, + systemAdmins = systemAdmins) val systemStreamPartitions = taskModel .getSystemStreamPartitions @@ -494,16 +490,6 @@ object SamzaContainer extends Logging { reporters = reporters, jvm = jvm) } - - /** - * Builds a map from SystemStreamPartition to oldest offset for changelogs. - */ - def getChangeLogOldestOffsetsForPartition(partition: Partition, inputStreamMetadata: Map[SystemStream, SystemStreamMetadata]): Map[SystemStream, String] = { - inputStreamMetadata - .mapValues(_.getSystemStreamPartitionMetadata.get(partition)) - .filter(_._2 != null) - .mapValues(_.getOldestOffset) - } } class SamzaContainer( http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index b8719c3..f68a7fe 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -23,10 +23,7 @@ import java.io.File import scala.collection.Map import org.apache.samza.util.Logging import org.apache.samza.Partition -import org.apache.samza.system.SystemConsumer -import org.apache.samza.system.SystemStream -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.system.SystemStreamPartitionIterator +import org.apache.samza.system._ import org.apache.samza.util.Util import org.apache.samza.SamzaException import org.apache.samza.container.TaskName @@ -50,16 +47,20 @@ class TaskStorageManager( taskStores: Map[String, StorageEngine] = Map(), storeConsumers: Map[String, SystemConsumer] = Map(), changeLogSystemStreams: Map[String, SystemStream] = Map(), - changeLogOldestOffsets: Map[SystemStream, String] = Map(), + changeLogStreamPartitions: Int, + streamMetadataCache: StreamMetadataCache, storeBaseDir: File = new File(System.getProperty("user.dir"), "state"), - partition: Partition) extends Logging { + partition: Partition, + systemAdmins: Map[String, SystemAdmin]) extends Logging { var taskStoresToRestore = taskStores + var changeLogOldestOffsets: Map[SystemStream, String] = Map() def apply(storageEngineName: String) = taskStores(storageEngineName) def init { cleanBaseDirs + createStreams startConsumers restoreStores stopConsumers @@ -67,7 +68,6 @@ class TaskStorageManager( private def cleanBaseDirs { debug("Cleaning base directories for stores.") - taskStores.keys.foreach(storeName => { val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) @@ -78,6 +78,21 @@ class TaskStorageManager( }) } + private def createStreams = { + info("Creating streams that are not present for changelog") + + for ((storeName, systemStream) <- changeLogSystemStreams) { + var systemAdmin = systemAdmins.getOrElse(systemStream.getSystem, throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream)) + systemAdmin.createChangelogStream(systemStream.getStream, changeLogStreamPartitions) + } + + val changeLogMetadata = streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet) + info("Got change log stream metadata: %s" format changeLogMetadata) + + changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(partition, changeLogMetadata) + info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets)) + } + private def startConsumers { debug("Starting consumers for stores.") @@ -131,4 +146,15 @@ class TaskStorageManager( taskStores.values.foreach(_.stop) } + + + /** + * Builds a map from SystemStreamPartition to oldest offset for changelogs. + */ + private def getChangeLogOldestOffsetsForPartition(partition: Partition, inputStreamMetadata: Map[SystemStream, SystemStreamMetadata]): Map[SystemStream, String] = { + inputStreamMetadata + .mapValues(_.getSystemStreamPartitionMetadata.get(partition)) + .filter(_._2 != null) + .mapValues(_.getOldestOffset) + } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala index 98e92bc..ec1d749 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala @@ -130,4 +130,8 @@ class FileReaderSystemAdmin extends SystemAdmin with Logging { }) enterPosition } + + override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = { + throw new SamzaException("Method not implemented") + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala index 35e7f6b..d18d4c4 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala @@ -257,6 +257,10 @@ class TestOffsetManager { def getSystemStreamMetadata(streamNames: java.util.Set[String]) = Map[String, SystemStreamMetadata]() + + override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = { + new SamzaException("Method not implemented") + } } } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala index 1eb0eda..a8e5d36 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala @@ -116,4 +116,6 @@ class MockSystemAdmin extends SystemAdmin { new Partition(2) -> new SystemStreamPartitionMetadata(null, null, null)) Map(streamNames.toList.head -> new SystemStreamMetadata("foo", partitionMetadata)) } + + override def createChangelogStream(streamName: String, numOfPartitions: Int) = ??? } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala index f7db2a1..f2defbd 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -50,8 +50,8 @@ object KafkaCheckpointManagerFactory { "message.send.max.retries" -> (Integer.MAX_VALUE - 1).toString) // Set the checkpoint topic configs to have a very small segment size and - // enable log compaction. This keeps job startup time small since there - // are fewer useless (overwritten) messages to read from the checkpoint + // enable log compaction. This keeps job startup time small since there + // are fewer useless (overwritten) messages to read from the checkpoint // topic. def getCheckpointTopicProperties(config: Config) = { val segmentBytes = config http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 9fc1f56..e57b8ba 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -19,11 +19,20 @@ package org.apache.samza.config + +import java.util.regex.Pattern + +import org.apache.samza.SamzaException +import org.apache.samza.util.Util + import scala.collection.JavaConversions._ import kafka.consumer.ConsumerConfig import java.util.Properties import kafka.producer.ProducerConfig import java.util.UUID +import scala.collection.JavaConverters._ +import org.apache.samza.system.kafka.KafkaSystemFactory +import org.apache.samza.config.SystemConfig.Config2System object KafkaConfig { val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex" @@ -34,6 +43,14 @@ object KafkaConfig { val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor" val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes" + val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog.replication.factor" + val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka." + // The default segment size to use for changelog topics + val CHANGELOG_DEFAULT_SEGMENT_SIZE = "536870912" + + // Helper regular expression definitions to extract/match configurations + val CHANGELOG_STREAM_NAMES_REGEX = "stores\\.(.*)\\.changelog$" + /** * Defines how low a queue can get for a single system/stream/partition * combination before trying to fetch more messages for it. @@ -84,6 +101,35 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName) def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName) def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true) + def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name) + + // The method returns a map of storenames to changelog topic names, which are configured to use kafka as the changelog stream + def getKafkaChangelogEnabledStores() = { + val changelogConfigs = config.regexSubset(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX).asScala + var storeToChangelog = Map[String, String]() + for((changelogConfig, changelogName) <- changelogConfigs){ + // Lookup the factory for this particular stream and verify if it's a kafka system + val systemStream = Util.getSystemStreamFromNames(changelogName) + val factoryName = config.getSystemFactory(systemStream.getSystem).getOrElse(new SamzaException("Unable to determine factory for system: " + systemStream.getSystem)) + if(classOf[KafkaSystemFactory].getCanonicalName == factoryName){ + val pattern = Pattern.compile(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX) + val matcher = pattern.matcher(changelogConfig) + val storeName = if(matcher.find()) matcher.group(1) else throw new SamzaException("Unable to find store name in the changelog configuration: " + changelogConfig + " with SystemStream: " + systemStream) + storeToChangelog += storeName -> systemStream.getStream + } + } + storeToChangelog + } + + // Get all kafka properties for changelog stream topic creation + def getChangelogKafkaProperties(name: String) = { + val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true) + val kafkaChangeLogProperties = new Properties + kafkaChangeLogProperties.setProperty("cleanup.policy", "compact") + kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE) + filteredConfigs.foreach{kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2)} + kafkaChangeLogProperties + } // kafka config def getKafkaSystemConsumerConfig( http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 5ac33ea..b790be1 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -19,25 +19,21 @@ package org.apache.samza.system.kafka +import org.I0Itec.zkclient.ZkClient import org.apache.samza.Partition import org.apache.samza.SamzaException import org.apache.samza.system.SystemAdmin import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.util.ClientUtilTopicMetadataStore -import org.apache.samza.util.ExponentialSleepStrategy +import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging} import kafka.api._ import kafka.consumer.SimpleConsumer -import kafka.utils.Utils -import kafka.client.ClientUtils -import kafka.common.TopicAndPartition -import kafka.common.ErrorMapping -import kafka.cluster.Broker -import org.apache.samza.util.Logging -import java.util.UUID +import kafka.common.{TopicExistsException, TopicAndPartition, ErrorMapping} +import java.util.{Properties, UUID} import scala.collection.JavaConversions._ import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata import kafka.consumer.ConsumerConfig +import kafka.admin.AdminUtils object KafkaSystemAdmin extends Logging { /** @@ -73,6 +69,13 @@ object KafkaSystemAdmin extends Logging { } /** + * A helper class that is used to construct the changelog stream specific information + * @param replicationFactor The number of replicas for the changelog stream + * @param kafkaProps The kafka specific properties that need to be used for changelog stream creation + */ +case class ChangelogInfo(var replicationFactor: Int, var kafkaProps: Properties) + +/** * A Kafka-based implementation of SystemAdmin. */ class KafkaSystemAdmin( @@ -106,7 +109,20 @@ class KafkaSystemAdmin( * The client ID to use for the simple consumer when fetching metadata from * Kafka. Equivalent to Kafka's client.id configuration. */ - clientId: String = UUID.randomUUID.toString) extends SystemAdmin with Logging { + clientId: String = UUID.randomUUID.toString, + + /** + * A function that returns a Zookeeper client to connect to fetch the meta + * data information + */ + connectZk: () => ZkClient, + + /** + * Replication factor for the Changelog topic in kafka + * Kafka properties to be used during the Changelog topic creation + */ + topicMetaInformation: Map[String, ChangelogInfo] = Map[String, ChangelogInfo]() + ) extends SystemAdmin with Logging { import KafkaSystemAdmin._ @@ -259,4 +275,82 @@ class KafkaSystemAdmin( offsets } + + private def createTopicInKafka(topicName: String, numKafkaChangelogPartitions: Int) { + val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy + info("Attempting to create change log topic %s." format topicName) + info("Using partition count "+ numKafkaChangelogPartitions + " for creating change log topic") + val topicMetaInfo = topicMetaInformation.getOrElse(topicName, throw new KafkaChangelogException("Unable to find topic information for topic " + topicName)) + retryBackoff.run( + loop => { + val zkClient = connectZk() + try { + AdminUtils.createTopic( + zkClient, + topicName, + numKafkaChangelogPartitions, + topicMetaInfo.replicationFactor, + topicMetaInfo.kafkaProps) + } finally { + zkClient.close + } + + info("Created changelog topic %s." format topicName) + loop.done + }, + + (exception, loop) => { + exception match { + case e: TopicExistsException => + info("Changelog topic %s already exists." format topicName) + loop.done + case e: Exception => + warn("Failed to create topic %s: %s. Retrying." format(topicName, e)) + debug("Exception detail:", e) + } + } + ) + } + + private def validateTopicInKafka(topicName: String, numKafkaChangelogPartitions: Int) { + val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy + info("Validating changelog topic %s." format topicName) + retryBackoff.run( + loop => { + val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout) + val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo) + val topicMetadata = topicMetadataMap(topicName) + ErrorMapping.maybeThrowException(topicMetadata.errorCode) + + val partitionCount = topicMetadata.partitionsMetadata.length + if (partitionCount < numKafkaChangelogPartitions) { + throw new KafkaChangelogException("Changelog topic validation failed for topic %s because partition count %s did not match expected partition count of %d" format(topicName, topicMetadata.partitionsMetadata.length, numKafkaChangelogPartitions)) + } + + info("Successfully validated changelog topic %s." format topicName) + loop.done + }, + + (exception, loop) => { + exception match { + case e: KafkaChangelogException => throw e + case e: Exception => + warn("While trying to validate topic %s: %s. Retrying." format(topicName, e)) + debug("Exception detail:", e) + } + } + ) + } + + /** + * Exception to be thrown when the change log stream creation or validation has failed + */ + class KafkaChangelogException(s: String, t: Throwable) extends SamzaException(s, t) { + def this(s: String) = this(s, null) + } + + override def createChangelogStream(topicName: String, numKafkaChangelogPartitions: Int) = { + createTopicInKafka(topicName, numKafkaChangelogPartitions) + validateTopicInKafka(topicName, numKafkaChangelogPartitions) + } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index 4ed5e88..4506ea3 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -19,17 +19,21 @@ package org.apache.samza.system.kafka -import org.apache.samza.util.KafkaUtil + +import java.util.Properties + +import kafka.utils.ZKStringSerializer +import org.I0Itec.zkclient.ZkClient +import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, ClientUtilTopicMetadataStore} import org.apache.samza.config.Config import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.config.KafkaConfig.Config2Kafka import org.apache.samza.SamzaException import kafka.producer.Producer import org.apache.samza.system.SystemFactory -import org.apache.samza.util.ExponentialSleepStrategy -import org.apache.samza.util.ClientUtilTopicMetadataStore -class KafkaSystemFactory extends SystemFactory { + +class KafkaSystemFactory extends SystemFactory with Logging { def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = { val clientId = KafkaUtil.getClientId("samza-consumer", config) val metrics = new KafkaSystemConsumerMetrics(systemName, registry) @@ -95,12 +99,24 @@ class KafkaSystemFactory extends SystemFactory { val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId) val timeout = consumerConfig.socketTimeoutMs val bufferSize = consumerConfig.socketReceiveBufferBytes + val storeToChangelog = config.getKafkaChangelogEnabledStores() + + // Construct the meta information for each topic, if the replication factor is not defined, we use 2 as the number of replicas for the change log stream. + val topicMetaInformation = storeToChangelog.map{case (storeName, topicName) => + { + val replicationFactor = config.getChangelogStreamReplicationFactor(storeName).getOrElse("2").toInt + val changelogInfo = ChangelogInfo(replicationFactor, config.getChangelogKafkaProperties(storeName)) + info("Creating topic meta information for topic: " + topicName + " with replication factor: " + replicationFactor) + (topicName, changelogInfo) + }}.toMap new KafkaSystemAdmin( systemName, brokerListString, timeout, bufferSize, - clientId) + clientId, + () => new ZkClient(consumerConfig.zkConnect, 6000, 6000, ZKStringSerializer), + topicMetaInformation) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala index d660b91..f1b7511 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala @@ -19,11 +19,9 @@ package org.apache.samza.util -import org.apache.samza.config.{KafkaConfig, Config, ConfigException} +import org.apache.samza.config.{Config, ConfigException} import org.apache.samza.config.JobConfig.Config2Job import java.util.concurrent.atomic.AtomicLong -import kafka.client.ClientUtils -import org.apache.samza.SamzaException object KafkaUtil { val counter = new AtomicLong(0) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala index 8109f73..0e1c38e 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -104,4 +104,22 @@ class TestKafkaConfig { // topic fetch size assertEquals(256*256, consumerConfig2 getOrElse ("topic1", 1024*1024)) } + + @Test + def testChangeLogProperties() { + val props = (new Properties /: Map( + "systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory", + "stores.test1.changelog" -> "kafka.mychangelog1", + "stores.test2.changelog" -> "kafka.mychangelog2", + "stores.test1.changelog.kafka.cleanup.policy" -> "delete" + )) { case (props, (k, v)) => props.put(k, v); props } + + val mapConfig = new MapConfig(props.toMap[String, String]) + val kafkaConfig = new KafkaConfig(mapConfig) + assertEquals(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"), "delete") + assertEquals(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact") + val storeToChangelog = kafkaConfig.getKafkaChangelogEnabledStores() + assertEquals(storeToChangelog.get("test1").getOrElse(""), "mychangelog1") + assertEquals(storeToChangelog.get("test2").getOrElse(""), "mychangelog2") + } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index 5ceb109..c759a7b 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -164,6 +164,10 @@ object TestKafkaSystemAdmin { class TestKafkaSystemAdmin { import TestKafkaSystemAdmin._ + val systemName = "test" + // Provide a random zkAddress, the system admin tries to connect only when a topic is created/validated + val systemAdmin = new KafkaSystemAdmin(systemName, brokers, connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) + def testShouldAssembleMetadata { val oldestOffsets = Map( new SystemStreamPartition("test", "stream1", new Partition(0)) -> "o1", @@ -205,8 +209,6 @@ class TestKafkaSystemAdmin { @Test def testShouldGetOldestNewestAndNextOffsets { - val systemName = "test" - val systemAdmin = new KafkaSystemAdmin(systemName, brokers) // Create an empty topic with 50 partitions, but with no offsets. createTopic @@ -271,7 +273,7 @@ class TestKafkaSystemAdmin { @Test def testNonExistentTopic { - val systemAdmin = new KafkaSystemAdmin("test", brokers) + val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic")) val metadata = initialOffsets.getOrElse("non-existent-topic", fail("missing metadata")) assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map( @@ -280,7 +282,6 @@ class TestKafkaSystemAdmin { @Test def testOffsetsAfter { - val systemAdmin = new KafkaSystemAdmin("test", brokers) val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0)) val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(1)) val offsetsAfter = systemAdmin.getOffsetsAfter(Map( @@ -290,7 +291,7 @@ class TestKafkaSystemAdmin { assertEquals("3", offsetsAfter(ssp2)) } - class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers) { + class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers, connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) { import kafka.api.{ TopicMetadata, TopicMetadataResponse } // Simulate Kafka telling us that the leader for the topic is not available http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java index fa1d51b..c0a20af 100644 --- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java +++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import org.apache.samza.Partition; +import org.apache.samza.SamzaException; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamPartition; @@ -53,7 +54,12 @@ public class MockSystemAdmin implements SystemAdmin { return metadata; } - @Override + @Override + public void createChangelogStream(String streamName, int numOfPartitions) { + throw new SamzaException("Method not implemented"); + } + + @Override public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) { Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>(); http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala index 118f5ee..ca25258 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala @@ -22,6 +22,7 @@ package org.apache.samza.test.integration import java.util.Properties import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit +import java.util.logging.{Level, LogManager, Handler, Logger} import kafka.admin.AdminUtils import kafka.common.ErrorMapping @@ -69,7 +70,8 @@ import scala.collection.mutable.SynchronizedMap object TestStatefulTask { val INPUT_TOPIC = "input" - val STATE_TOPIC = "mystore" + val STORE_NAME = "mystore" + val STATE_TOPIC_STREAM = "mystoreChangelog" val TOTAL_TASK_NAMES = 1 val REPLICATION_FACTOR = 3 @@ -87,6 +89,9 @@ object TestStatefulTask { val props1 = TestUtils.createBrokerConfig(brokerId1, port1) val props2 = TestUtils.createBrokerConfig(brokerId2, port2) val props3 = TestUtils.createBrokerConfig(brokerId3, port3) + props1.setProperty("auto.create.topics.enable","false") + props2.setProperty("auto.create.topics.enable","false") + props3.setProperty("auto.create.topics.enable","false") val config = new java.util.Properties() val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3) @@ -124,16 +129,10 @@ object TestStatefulTask { INPUT_TOPIC, TOTAL_TASK_NAMES, REPLICATION_FACTOR) - - AdminUtils.createTopic( - zkClient, - STATE_TOPIC, - TOTAL_TASK_NAMES, - REPLICATION_FACTOR) } def validateTopics { - val topics = Set(STATE_TOPIC, INPUT_TOPIC) + val topics = Set(INPUT_TOPIC) var done = false var retries = 0 @@ -207,7 +206,8 @@ class TestStatefulTask { "stores.mystore.factory" -> "org.apache.samza.storage.kv.KeyValueStorageEngineFactory", "stores.mystore.key.serde" -> "string", "stores.mystore.msg.serde" -> "string", - "stores.mystore.changelog" -> "kafka.mystore", + "stores.mystore.changelog" -> "kafka.mystoreChangelog", + "stores.mystore.changelog.replication.factor" -> "1", "systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory", // Always start consuming at offset 0. This avoids a race condition between @@ -249,7 +249,7 @@ class TestStatefulTask { send(task, "-99") // Validate that messages appear in store stream. - val messages = readAll(STATE_TOPIC, 5, "testShouldStartTaskForFirstTime") + val messages = readAll(STATE_TOPIC_STREAM, 5, "testShouldStartTaskForFirstTime") assertEquals(6, messages.length) assertEquals("1", messages(0)) @@ -291,7 +291,7 @@ class TestStatefulTask { send(task, "5") // Validate that messages appear in store stream. - val messages = readAll(STATE_TOPIC, 14, "testShouldRestoreStore") + val messages = readAll(STATE_TOPIC_STREAM, 14, "testShouldRestoreStore") assertEquals(15, messages.length) // From initial start. @@ -424,7 +424,7 @@ class TestTask extends StreamTask with InitableTask { def init(config: Config, context: TaskContext) { TestTask.register(context.getTaskName, this) store = context - .getStore(TestStatefulTask.STATE_TOPIC) + .getStore(TestStatefulTask.STORE_NAME) .asInstanceOf[KeyValueStore[String, String]] val iter = store.all restored ++= iter http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0ebfcbd0/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala index cab5101..58f2464 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala @@ -421,4 +421,6 @@ class MockSystemAdmin(numTasks: Int) extends SystemAdmin { streamName -> new SystemStreamMetadata(streamName, partitionMetadata) }).toMap[String, SystemStreamMetadata] } + + override def createChangelogStream(streamName: String, numOfPartitions: Int) = ??? }
