Repository: incubator-samza Updated Branches: refs/heads/master 1be188b6c -> d733ed961
SAMZA-393; enable log compaction for checkpoint topics in kafka Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/d733ed96 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/d733ed96 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/d733ed96 Branch: refs/heads/master Commit: d733ed961fec0a08a6706be22ca8a6537693bed2 Parents: 1be188b Author: Chris Riccomini <[email protected]> Authored: Tue Aug 26 09:02:39 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Aug 26 09:02:39 2014 -0700 ---------------------------------------------------------------------- .../kafka/KafkaCheckpointManager.scala | 7 +++-- .../kafka/KafkaCheckpointManagerFactory.scala | 30 +++++++++++++++----- .../kafka/TestKafkaCheckpointManager.scala | 26 ++++++++++++----- 3 files changed, 47 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d733ed96/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index f3e954a..1d5627d 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -44,6 +44,7 @@ import org.apache.samza.system.kafka.TopicMetadataCache import org.apache.samza.util.ExponentialSleepStrategy import org.apache.samza.util.TopicMetadataStore import scala.collection.mutable +import java.util.Properties /** * Kafka checkpoint manager is used to store checkpoints in a Kafka topic. @@ -65,7 +66,8 @@ class KafkaCheckpointManager( connectZk: () => ZkClient, systemStreamPartitionGrouperFactoryString: String, retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, - serde: CheckpointSerde = new CheckpointSerde) extends CheckpointManager with Logging { + serde: CheckpointSerde = new CheckpointSerde, + checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging { import KafkaCheckpointManager._ var taskNames = Set[TaskName]() @@ -355,7 +357,8 @@ class KafkaCheckpointManager( zkClient, checkpointTopic, 1, - replicationFactor) + replicationFactor, + checkpointTopicProperties) } finally { zkClient.close } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d733ed96/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala index 977330f..7ab50a3 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala @@ -31,13 +31,33 @@ import org.apache.samza.config.JobConfig.Config2Job import org.apache.samza.config.KafkaConfig.Config2Kafka import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.util.{ KafkaUtil, ClientUtilTopicMetadataStore } +import java.util.Properties +import scala.collection.JavaConversions._ object KafkaCheckpointManagerFactory { /** * Version number to track the format of the checkpoint log */ val CHECKPOINT_LOG_VERSION_NUMBER = 1 + + val INJECTED_PRODUCER_PROPERTIES = Map( + "request.required.acks" -> "-1", + // Forcibly disable compression because Kafka doesn't support compression + // on log compacted topics. Details in SAMZA-393. + "compression.codec" -> "none", + "producer.type" -> "sync", + // Subtract one here, because DefaultEventHandler calls messageSendMaxRetries + 1. + "message.send.max.retries" -> (Integer.MAX_VALUE - 1).toString) + + // Set the checkpoint topic configs to have a very small segment size and + // enable log compaction. This keeps job startup time small since there + // are fewer useless (overwritten) messages to read from the checkpoint + // topic. + val CHECKPOINT_TOPIC_PROPERTIES = (new Properties /: Map( + "cleanup.policy" -> "compact", + "segment.bytes" -> "26214400")) { case (props, (k, v)) => props.put(k, v); props } } + class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging { import KafkaCheckpointManagerFactory._ @@ -46,15 +66,10 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin val systemName = config .getCheckpointSystem .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager.")) - val injectedProducerProps = Map( - "request.required.acks" -> "-1", - "producer.type" -> "sync", - // Subtract one here, because DefaultEventHandler calls messageSendMaxRetries + 1. - "message.send.max.retries" -> (Integer.MAX_VALUE - 1).toString) val producerConfig = config.getKafkaSystemProducerConfig( systemName, clientId, - injectedProducerProps) + INJECTED_PRODUCER_PROPERTIES) val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId) val replicationFactor = config.getCheckpointReplicationFactor.getOrElse("3").toInt val socketTimeout = consumerConfig.socketTimeoutMs @@ -90,7 +105,8 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin metadataStore, connectProducer, connectZk, - systemStreamPartitionGrouperFactoryString) + systemStreamPartitionGrouperFactoryString, + checkpointTopicProperties = CHECKPOINT_TOPIC_PROPERTIES) } private def getTopic(jobName: String, jobId: String) = http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/d733ed96/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index 34fe6dd..f556479 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -37,14 +37,17 @@ import org.apache.samza.container.TaskName import org.apache.samza.serializers.CheckpointSerde import org.apache.samza.system.SystemStreamPartition import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore } -import org.apache.samza.{SamzaException, Partition} +import org.apache.samza.{ SamzaException, Partition } import org.junit.Assert._ -import org.junit.{AfterClass, BeforeClass, Test} +import org.junit.{ AfterClass, BeforeClass, Test } import scala.collection.JavaConversions._ import scala.collection._ import org.apache.samza.container.grouper.stream.GroupByPartitionFactory +import kafka.admin.AdminUtils object TestKafkaCheckpointManager { + val checkpointTopic = "checkpoint-topic" + val serdeCheckpointTopic = "checkpoint-topic-invalid-serde" val zkConnect: String = TestZKUtils.zookeeperConnect var zkClient: ZkClient = null val zkConnectionTimeout = 6000 @@ -68,6 +71,7 @@ object TestKafkaCheckpointManager { config.put("metadata.broker.list", brokers) config.put("producer.type", "sync") config.put("request.required.acks", "-1") + config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES) val producerConfig = new ProducerConfig(config) val partition = new Partition(0) val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123")) @@ -113,8 +117,14 @@ class TestKafkaCheckpointManager { val taskName = new TaskName(partition.toString) kcm.register(taskName) kcm.start - var readCp = kcm.readLastCheckpoint(taskName) + // check that log compaction is enabled. + val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer) + val topicConfig = AdminUtils.fetchTopicConfig(zkClient, checkpointTopic) + zkClient.close + assertEquals("compact", topicConfig.get("cleanup.policy")) + assertEquals("26214400", topicConfig.get("segment.bytes")) // read before topic exists should result in a null checkpoint + var readCp = kcm.readLastCheckpoint(taskName) assertNull(readCp) // create topic the first time around kcm.writeCheckpoint(taskName, cp1) @@ -157,7 +167,7 @@ class TestKafkaCheckpointManager { private def getKafkaCheckpointManager = new KafkaCheckpointManager( clientId = "some-client-id", - checkpointTopic = "checkpoint-topic", + checkpointTopic = checkpointTopic, systemName = "kafka", replicationFactor = 3, socketTimeout = 30000, @@ -166,12 +176,13 @@ class TestKafkaCheckpointManager { metadataStore = metadataStore, connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig), connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), - systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString) + systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString, + checkpointTopicProperties = KafkaCheckpointManagerFactory.CHECKPOINT_TOPIC_PROPERTIES) // inject serde. Kafka exceptions will be thrown when serde.fromBytes is called private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = new KafkaCheckpointManager( clientId = "some-client-id-invalid-serde", - checkpointTopic = "checkpoint-topic-invalid-serde", + checkpointTopic = serdeCheckpointTopic, systemName = "kafka", replicationFactor = 3, socketTimeout = 30000, @@ -181,7 +192,8 @@ class TestKafkaCheckpointManager { connectProducer = () => new Producer[Array[Byte], Array[Byte]](producerConfig), connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString, - serde = new InvalideSerde(exception)) + serde = new InvalideSerde(exception), + checkpointTopicProperties = KafkaCheckpointManagerFactory.CHECKPOINT_TOPIC_PROPERTIES) class InvalideSerde(exception: String) extends CheckpointSerde { override def fromBytes(bytes: Array[Byte]): Checkpoint = {
