Repository: incubator-samza Updated Branches: refs/heads/0.8.0 d7680cab1 -> cc5f3105a
Revert "SAMZA-226; auto-create changelog streams" This reverts commit d7680cab1ca0c977c56e931548a80728af58578a. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/cc5f3105 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/cc5f3105 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/cc5f3105 Branch: refs/heads/0.8.0 Commit: cc5f3105af452c9950f73949d800c0ae0b7760fa Parents: d7680ca Author: Chris Riccomini <[email protected]> Authored: Tue Nov 25 08:44:55 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Tue Nov 25 08:44:55 2014 -0800 ---------------------------------------------------------------------- .reviewboardrc | 2 +- .../versioned/jobs/configuration-table.html | 8 - .../java/org/apache/samza/config/Config.java | 17 --- .../org/apache/samza/system/SystemAdmin.java | 7 - ...inglePartitionWithoutOffsetsSystemAdmin.java | 6 - .../apache/samza/container/SamzaContainer.scala | 30 ++-- .../samza/storage/TaskStorageManager.scala | 41 +----- .../filereader/FileReaderSystemAdmin.scala | 4 - .../samza/checkpoint/TestOffsetManager.scala | 4 - .../org/apache/samza/config/KafkaConfig.scala | 35 ----- .../samza/system/kafka/KafkaSystemAdmin.scala | 145 ++++--------------- .../samza/system/kafka/KafkaSystemFactory.scala | 16 +- .../scala/org/apache/samza/util/KafkaUtil.scala | 4 +- .../system/kafka/TestKafkaSystemAdmin.scala | 12 +- .../samza/system/mock/MockSystemAdmin.java | 8 +- .../test/integration/TestStatefulTask.scala | 14 +- 16 files changed, 71 insertions(+), 282 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc5f3105/.reviewboardrc ---------------------------------------------------------------------- diff --git a/.reviewboardrc b/.reviewboardrc index c561205..9339119 100644 --- a/.reviewboardrc +++ b/.reviewboardrc @@ -2,4 +2,4 @@ REVIEWBOARD_URL = 'https://reviews.apache.org' REPOSITORY = 'samza' GUESS_DESCRIPTION = True TARGET_GROUPS = 'samza' -TRACKING_BRANCH = 'origin/0.8.0' +TRACKING_BRANCH = 'origin/master' http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc5f3105/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 06c5f05..4266a13 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -783,14 +783,6 @@ </tr> <tr> - <td class="property" id="store-changelog-partitions">stores.<span class="store">store-name</span>.changelog.<br>replication.factor</td> - <td class="default">2</td> - <td class="description"> - The property defines the number of replicas to use for the change log stream. - </td> - </tr> - - <tr> <th colspan="3" class="section" id="regex-rewriter"> Consuming all Kafka topics matching a regular expression<br> <span class="subtitle"> http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc5f3105/samza-api/src/main/java/org/apache/samza/config/Config.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/config/Config.java b/samza-api/src/main/java/org/apache/samza/config/Config.java index 9f7ade0..2048e90 100644 --- a/samza-api/src/main/java/org/apache/samza/config/Config.java +++ b/samza-api/src/main/java/org/apache/samza/config/Config.java @@ -26,8 +26,6 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /** * Store and retrieve named, typed values as configuration for classes implementing this interface. @@ -54,21 +52,6 @@ public abstract class Config implements Map<String, String> { return new MapConfig(out); } - public Config regexSubset(String regex) { - Map<String, String> out = new HashMap<String, String>(); - Pattern pattern = Pattern.compile(regex); - - for (Entry<String, String> entry : entrySet()) { - String k = entry.getKey(); - Matcher matcher = pattern.matcher(k); - if(matcher.find()){ - out.put(k, entry.getValue()); - } - } - - return new MapConfig(out); - } - public String get(String k, String defaultString) { if (!containsKey(k)) { return defaultString; http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc5f3105/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java index 8995ba3..571c606 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java @@ -50,11 +50,4 @@ public interface SystemAdmin { * requested in the parameter set. */ Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames); - - /** - * An API to create a change log stream - * @param streamName The name of the stream to be created in the underlying stream - * @param numOfPartitions The number of partitions in the changelog stream - */ - void createChangelogStream(String streamName, int numOfPartitions); } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc5f3105/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java index 01997ae..38e313f 100644 --- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java @@ -23,7 +23,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import org.apache.samza.Partition; -import org.apache.samza.SamzaException; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata; @@ -55,11 +54,6 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin { } @Override - public void createChangelogStream(String streamName, int numOfPartitions){ - throw new SamzaException("Method not implemented"); - } - - @Override public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) { Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>(); http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc5f3105/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 8a39766..d91d6d7 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -275,6 +275,10 @@ object SamzaContainer extends Logging { info("Got change log system streams: %s" format changeLogSystemStreams) + val changeLogMetadata = streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet) + + info("Got change log stream metadata: %s" format changeLogMetadata) + val serdeManager = new SerdeManager( serdes = serdes, systemKeySerdes = systemKeySerdes, @@ -409,12 +413,6 @@ object SamzaContainer extends Logging { val containerContext = new SamzaContainerContext(containerName, config, taskNames) - // compute the number of partitions necessary for the change log stream creation - var maxChangeLogStreamPartitions = -1 - taskNameToChangeLogPartitionMapping.map(x => if(maxChangeLogStreamPartitions < x._2) maxChangeLogStreamPartitions = x._2) - // Increment by 1 because partition starts from 0, but we need the absolute count, this value is used for change log topic creation. - maxChangeLogStreamPartitions = maxChangeLogStreamPartitions + 1 - val taskInstances: Map[TaskName, TaskInstance] = taskNames.map(taskName => { debug("Setting up task instance: %s" format taskName) @@ -468,16 +466,18 @@ object SamzaContainer extends Logging { info("Got task stores: %s" format taskStores) + val changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(partitionForThisTaskName, changeLogMetadata) + + info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets)) + val storageManager = new TaskStorageManager( taskName = taskName, taskStores = taskStores, storeConsumers = storeConsumers, changeLogSystemStreams = changeLogSystemStreams, - maxChangeLogStreamPartitions, - streamMetadataCache = streamMetadataCache, + changeLogOldestOffsets = changeLogOldestOffsets, storeBaseDir = storeBaseDir, - partitionForThisTaskName, - systemAdmins = systemAdmins) + partitionForThisTaskName) val systemStreamPartitions: Set[SystemStreamPartition] = sspTaskNames.getOrElse(taskName, throw new SamzaException("Can't find taskName " + taskName + " in map of SystemStreamPartitions: " + sspTaskNames)) @@ -519,6 +519,16 @@ object SamzaContainer extends Logging { reporters = reporters, jvm = jvm) } + + /** + * Builds a map from SystemStreamPartition to oldest offset for changelogs. + */ + def getChangeLogOldestOffsetsForPartition(partition: Partition, inputStreamMetadata: Map[SystemStream, SystemStreamMetadata]): Map[SystemStream, String] = { + inputStreamMetadata + .mapValues(_.getSystemStreamPartitionMetadata.get(partition)) + .filter(_._2 != null) + .mapValues(_.getOldestOffset) + } } class SamzaContainer( http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc5f3105/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index af0928e..b8719c3 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -23,7 +23,10 @@ import java.io.File import scala.collection.Map import org.apache.samza.util.Logging import org.apache.samza.Partition -import org.apache.samza.system._ +import org.apache.samza.system.SystemConsumer +import org.apache.samza.system.SystemStream +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.system.SystemStreamPartitionIterator import org.apache.samza.util.Util import org.apache.samza.SamzaException import org.apache.samza.container.TaskName @@ -47,20 +50,16 @@ class TaskStorageManager( taskStores: Map[String, StorageEngine] = Map(), storeConsumers: Map[String, SystemConsumer] = Map(), changeLogSystemStreams: Map[String, SystemStream] = Map(), - changeLogStreamPartitions: Int, - streamMetadataCache: StreamMetadataCache, + changeLogOldestOffsets: Map[SystemStream, String] = Map(), storeBaseDir: File = new File(System.getProperty("user.dir"), "state"), - partition: Partition, - systemAdmins: Map[String, SystemAdmin]) extends Logging { + partition: Partition) extends Logging { var taskStoresToRestore = taskStores - var changeLogOldestOffsets: Map[SystemStream, String] = Map() def apply(storageEngineName: String) = taskStores(storageEngineName) def init { cleanBaseDirs - createStreams startConsumers restoreStores stopConsumers @@ -68,6 +67,7 @@ class TaskStorageManager( private def cleanBaseDirs { debug("Cleaning base directories for stores.") + taskStores.keys.foreach(storeName => { val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) @@ -78,22 +78,6 @@ class TaskStorageManager( }) } - private def createStreams = { - info("Creating streams that are not present for changelog") - - for ((storeName, systemStream) <- changeLogSystemStreams) { - var systemAdmin = systemAdmins.getOrElse(systemStream.getSystem, throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream)) - systemAdmin.createChangelogStream(systemStream.getStream, changeLogStreamPartitions) - } - - val changeLogMetadata = streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet) - info("Got change log stream metadata: %s" format changeLogMetadata) - - changeLogOldestOffsets = getChangeLogOldestOffsetsForPartition(partition, changeLogMetadata) - info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets)) - } - - private def startConsumers { debug("Starting consumers for stores.") @@ -147,15 +131,4 @@ class TaskStorageManager( taskStores.values.foreach(_.stop) } - - - /** - * Builds a map from SystemStreamPartition to oldest offset for changelogs. - */ - private def getChangeLogOldestOffsetsForPartition(partition: Partition, inputStreamMetadata: Map[SystemStream, SystemStreamMetadata]): Map[SystemStream, String] = { - inputStreamMetadata - .mapValues(_.getSystemStreamPartitionMetadata.get(partition)) - .filter(_._2 != null) - .mapValues(_.getOldestOffset) - } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc5f3105/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala index ec1d749..98e92bc 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala @@ -130,8 +130,4 @@ class FileReaderSystemAdmin extends SystemAdmin with Logging { }) enterPosition } - - override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = { - throw new SamzaException("Method not implemented") - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc5f3105/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala index cff2fd6..a79ecca 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala @@ -244,10 +244,6 @@ class TestOffsetManager { def getSystemStreamMetadata(streamNames: java.util.Set[String]) = Map[String, SystemStreamMetadata]() - - override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = { - new SamzaException("Method not implemented") - } } } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc5f3105/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 235c881..9fc1f56 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -19,18 +19,11 @@ package org.apache.samza.config - -import org.apache.samza.SamzaException - import scala.collection.JavaConversions._ import kafka.consumer.ConsumerConfig import java.util.Properties import kafka.producer.ProducerConfig import java.util.UUID -import scala.collection.JavaConverters._ -import org.apache.samza.system.kafka.KafkaSystemFactory -import scala.collection.mutable.ArrayBuffer - object KafkaConfig { val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex" @@ -41,10 +34,6 @@ object KafkaConfig { val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor" val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes" - val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog.replication.factor" - val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka" - val CHANGELOG_STREAM_NAMES_REGEX = "stores\\..*\\.changelog$" - /** * Defines how low a queue can get for a single system/stream/partition * combination before trying to fetch more messages for it. @@ -95,30 +84,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName) def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName) def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true) - def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name) - def getChangelogTopicNames() = { - val changelogConfigs = config.regexSubset(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX).asScala - val topicNamesList = ArrayBuffer[String]() - for((changelogConfig, changelogName) <- changelogConfigs){ - // Lookup the factory for this particular stream and verify if it's a kafka system - val changelogNameSplit = changelogName.split("\\.") - if(changelogNameSplit.length < 2) throw new SamzaException("Changelog name not in expected format") - val factoryName = config.get(String.format(SystemConfig.SYSTEM_FACTORY, changelogNameSplit(0))) - if(classOf[KafkaSystemFactory].getCanonicalName == factoryName){ - topicNamesList += changelogNameSplit(1) - } - } - topicNamesList.toList - } - - // Get all kafka properties for changelog stream topic creation - def getChangelogKafkaProperties(name: String) = { - val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true) - - val kafkaChangeLogProperties = new Properties - filteredConfigs.foreach{kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2)} - kafkaChangeLogProperties - } // kafka config def getKafkaSystemConsumerConfig( http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc5f3105/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index 6b5b9a4..5ac33ea 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -19,21 +19,25 @@ package org.apache.samza.system.kafka -import org.I0Itec.zkclient.ZkClient import org.apache.samza.Partition import org.apache.samza.SamzaException import org.apache.samza.system.SystemAdmin import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, Logging} +import org.apache.samza.util.ClientUtilTopicMetadataStore +import org.apache.samza.util.ExponentialSleepStrategy import kafka.api._ import kafka.consumer.SimpleConsumer -import kafka.common.{TopicExistsException, TopicAndPartition, ErrorMapping} -import java.util.{Properties, UUID} +import kafka.utils.Utils +import kafka.client.ClientUtils +import kafka.common.TopicAndPartition +import kafka.common.ErrorMapping +import kafka.cluster.Broker +import org.apache.samza.util.Logging +import java.util.UUID import scala.collection.JavaConversions._ import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata import kafka.consumer.ConsumerConfig -import kafka.admin.AdminUtils object KafkaSystemAdmin extends Logging { /** @@ -69,59 +73,40 @@ object KafkaSystemAdmin extends Logging { } /** - * A helper class that is used to construct the changelog stream specific information - * @param replicationFactor The number of replicas for the changelog stream - * @param kafkaProps The kafka specific properties that need to be used for changelog stream creation - */ -case class ChangeLogInfo(var replicationFactor: Int, var kafkaProps: Properties) - -/** * A Kafka-based implementation of SystemAdmin. */ class KafkaSystemAdmin( /** - * The system name to use when creating SystemStreamPartitions to return in - * the getSystemStreamMetadata responser. - */ + * The system name to use when creating SystemStreamPartitions to return in + * the getSystemStreamMetadata responser. + */ systemName: String, + // TODO whenever Kafka decides to make the Set[Broker] class public, let's switch to Set[Broker] here. /** - * List of brokers that are part of the Kafka system that we wish to - * interact with. The format is host1:port1,host2:port2. - */ + * List of brokers that are part of the Kafka system that we wish to + * interact with. The format is host1:port1,host2:port2. + */ brokerListString: String, /** - * The timeout to use for the simple consumer when fetching metadata from - * Kafka. Equivalent to Kafka's socket.timeout.ms configuration. - */ + * The timeout to use for the simple consumer when fetching metadata from + * Kafka. Equivalent to Kafka's socket.timeout.ms configuration. + */ timeout: Int = Int.MaxValue, /** - * The buffer size to use for the simple consumer when fetching metadata - * from Kafka. Equivalent to Kafka's socket.receive.buffer.bytes - * configuration. - */ + * The buffer size to use for the simple consumer when fetching metadata + * from Kafka. Equivalent to Kafka's socket.receive.buffer.bytes + * configuration. + */ bufferSize: Int = ConsumerConfig.SocketBufferSize, /** - * The client ID to use for the simple consumer when fetching metadata from - * Kafka. Equivalent to Kafka's client.id configuration. - */ - clientId: String = UUID.randomUUID.toString, - - /** - * A function that returns a Zookeeper client to connect to fetch the meta - * data information - */ - connectZk: () => ZkClient, - - /** - * Replication factor for the Changelog topic in kafka - * Kafka properties to be used during the Changelog topic creation + * The client ID to use for the simple consumer when fetching metadata from + * Kafka. Equivalent to Kafka's client.id configuration. */ - topicMetaInformation: Map[String, ChangeLogInfo] = Map[String, ChangeLogInfo]() - ) extends SystemAdmin with Logging { + clientId: String = UUID.randomUUID.toString) extends SystemAdmin with Logging { import KafkaSystemAdmin._ @@ -274,82 +259,4 @@ class KafkaSystemAdmin( offsets } - - private def createTopicInKafka(topicName: String, numKafkaChangeLogPartitions: Int) { - val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy - info("Attempting to create change log topic %s." format topicName) - info("Using partition count "+ numKafkaChangeLogPartitions + " for creating change log topic") - val topicMetaInfo = topicMetaInformation.getOrElse(topicName, throw new KafkaChangeLogException("Unable to find topic information for topic " + topicName)) - retryBackoff.run( - loop => { - val zkClient = connectZk() - try { - AdminUtils.createTopic( - zkClient, - topicName, - numKafkaChangeLogPartitions, - topicMetaInfo.replicationFactor, - topicMetaInfo.kafkaProps) - } finally { - zkClient.close - } - - info("Created changelog topic %s." format topicName) - loop.done - }, - - (exception, loop) => { - exception match { - case e: TopicExistsException => - info("Changelog topic %s already exists." format topicName) - loop.done - case e: Exception => - warn("Failed to create topic %s: %s. Retrying." format(topicName, e)) - debug("Exception detail:", e) - } - } - ) - } - - private def validateTopicInKafka(topicName: String, numKafkaChangeLogPartitions: Int) { - val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy - info("Validating changelog topic %s." format topicName) - retryBackoff.run( - loop => { - val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout) - val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topicName), systemName, metadataStore.getTopicInfo) - val topicMetadata = topicMetadataMap(topicName) - ErrorMapping.maybeThrowException(topicMetadata.errorCode) - - val partitionCount = topicMetadata.partitionsMetadata.length - if (partitionCount < numKafkaChangeLogPartitions) { - throw new KafkaChangeLogException("Changelog topic validation failed for topic %s because partition count %s did not match expected partition count of %d" format(topicName, topicMetadata.partitionsMetadata.length, numKafkaChangeLogPartitions)) - } - - info("Successfully validated changelog topic %s." format topicName) - loop.done - }, - - (exception, loop) => { - exception match { - case e: KafkaChangeLogException => throw e - case e: Exception => - warn("While trying to validate topic %s: %s. Retrying." format(topicName, e)) - debug("Exception detail:", e) - } - } - ) - } - - /** - * Exception to be thrown when the change log stream creation or validation has failed - */ - class KafkaChangeLogException(s: String, t: Throwable) extends SamzaException(s, t) { - def this(s: String) = this(s, null) - } - - override def createChangelogStream(topicName: String, numKafkaChangeLogPartitions: Int) = { - createTopicInKafka(topicName, numKafkaChangeLogPartitions) - validateTopicInKafka(topicName, numKafkaChangeLogPartitions) - } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc5f3105/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index 9ab836c..4ed5e88 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -19,9 +19,6 @@ package org.apache.samza.system.kafka - -import kafka.utils.ZKStringSerializer -import org.I0Itec.zkclient.ZkClient import org.apache.samza.util.KafkaUtil import org.apache.samza.config.Config import org.apache.samza.metrics.MetricsRegistry @@ -98,23 +95,12 @@ class KafkaSystemFactory extends SystemFactory { val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId) val timeout = consumerConfig.socketTimeoutMs val bufferSize = consumerConfig.socketReceiveBufferBytes - val topicNames = config.getChangelogTopicNames() - var topicMetaInformation = Map[String, ChangeLogInfo]() - - // Construct the meta information for each topic, if the replication factor is not defined, we use 2 as the number of replicas for the change log stream. - for(topicName <- topicNames) { - val replicationFactor = config.getChangelogStreamReplicationFactor(topicName).getOrElse("2").toInt - val changelogInfo = ChangeLogInfo(replicationFactor, config.getChangelogKafkaProperties(topicName)) - topicMetaInformation += topicName -> changelogInfo - } new KafkaSystemAdmin( systemName, brokerListString, timeout, bufferSize, - clientId, - () => new ZkClient(consumerConfig.zkConnect, 6000, 6000, ZKStringSerializer), - topicMetaInformation) + clientId) } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc5f3105/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala index f1b7511..d660b91 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala @@ -19,9 +19,11 @@ package org.apache.samza.util -import org.apache.samza.config.{Config, ConfigException} +import org.apache.samza.config.{KafkaConfig, Config, ConfigException} import org.apache.samza.config.JobConfig.Config2Job import java.util.concurrent.atomic.AtomicLong +import kafka.client.ClientUtils +import org.apache.samza.SamzaException object KafkaUtil { val counter = new AtomicLong(0) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc5f3105/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala index a5af465..5ceb109 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala @@ -40,7 +40,6 @@ import kafka.zk.EmbeddedZookeeper import org.I0Itec.zkclient.ZkClient import org.apache.samza.Partition -import org.apache.samza.config.MapConfig import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata import org.apache.samza.system.SystemStreamPartition @@ -165,10 +164,6 @@ object TestKafkaSystemAdmin { class TestKafkaSystemAdmin { import TestKafkaSystemAdmin._ - val systemName = "test" - // Provide a random zkAddress, the system admin tries to connect only when a topic is created/validated - val systemAdmin = new KafkaSystemAdmin(systemName, brokers, connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) - def testShouldAssembleMetadata { val oldestOffsets = Map( new SystemStreamPartition("test", "stream1", new Partition(0)) -> "o1", @@ -210,6 +205,8 @@ class TestKafkaSystemAdmin { @Test def testShouldGetOldestNewestAndNextOffsets { + val systemName = "test" + val systemAdmin = new KafkaSystemAdmin(systemName, brokers) // Create an empty topic with 50 partitions, but with no offsets. createTopic @@ -274,7 +271,7 @@ class TestKafkaSystemAdmin { @Test def testNonExistentTopic { - + val systemAdmin = new KafkaSystemAdmin("test", brokers) val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic")) val metadata = initialOffsets.getOrElse("non-existent-topic", fail("missing metadata")) assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map( @@ -283,6 +280,7 @@ class TestKafkaSystemAdmin { @Test def testOffsetsAfter { + val systemAdmin = new KafkaSystemAdmin("test", brokers) val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0)) val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(1)) val offsetsAfter = systemAdmin.getOffsetsAfter(Map( @@ -292,7 +290,7 @@ class TestKafkaSystemAdmin { assertEquals("3", offsetsAfter(ssp2)) } - class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers, connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) { + class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers) { import kafka.api.{ TopicMetadata, TopicMetadataResponse } // Simulate Kafka telling us that the leader for the topic is not available http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc5f3105/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java index c0a20af..fa1d51b 100644 --- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java +++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java @@ -23,7 +23,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import org.apache.samza.Partition; -import org.apache.samza.SamzaException; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamPartition; @@ -54,12 +53,7 @@ public class MockSystemAdmin implements SystemAdmin { return metadata; } - @Override - public void createChangelogStream(String streamName, int numOfPartitions) { - throw new SamzaException("Method not implemented"); - } - - @Override + @Override public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) { Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>(); http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/cc5f3105/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala index b23968b..118f5ee 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala @@ -22,7 +22,6 @@ package org.apache.samza.test.integration import java.util.Properties import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -import java.util.logging.{Level, LogManager, Handler, Logger} import kafka.admin.AdminUtils import kafka.common.ErrorMapping @@ -88,9 +87,6 @@ object TestStatefulTask { val props1 = TestUtils.createBrokerConfig(brokerId1, port1) val props2 = TestUtils.createBrokerConfig(brokerId2, port2) val props3 = TestUtils.createBrokerConfig(brokerId3, port3) - props1.setProperty("auto.create.topics.enable","false") - props2.setProperty("auto.create.topics.enable","false") - props3.setProperty("auto.create.topics.enable","false") val config = new java.util.Properties() val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3) @@ -110,7 +106,6 @@ object TestStatefulTask { @BeforeClass def beforeSetupServers { - zookeeper = new EmbeddedZookeeper(zkConnect) server1 = TestUtils.createServer(new KafkaConfig(props1)) server2 = TestUtils.createServer(new KafkaConfig(props2)) @@ -129,10 +124,16 @@ object TestStatefulTask { INPUT_TOPIC, TOTAL_TASK_NAMES, REPLICATION_FACTOR) + + AdminUtils.createTopic( + zkClient, + STATE_TOPIC, + TOTAL_TASK_NAMES, + REPLICATION_FACTOR) } def validateTopics { - val topics = Set(INPUT_TOPIC) + val topics = Set(STATE_TOPIC, INPUT_TOPIC) var done = false var retries = 0 @@ -207,7 +208,6 @@ class TestStatefulTask { "stores.mystore.key.serde" -> "string", "stores.mystore.msg.serde" -> "string", "stores.mystore.changelog" -> "kafka.mystore", - "stores.mystore.changelog.replication.factor" -> "1", "systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory", // Always start consuming at offset 0. This avoids a race condition between
