This is an automated email from the ASF dual-hosted git repository. lhaiesp pushed a commit to branch 1.3.1 in repository https://gitbox.apache.org/repos/asf/samza.git
commit e250678fa4bb238ec82414d20ba2212da29a1f27 Author: shanthoosh <[email protected]> AuthorDate: Tue Jan 14 19:02:13 2020 -0800 SAMZA-2431: Fix the checkpoint and changelog topic auto-creation. (#1251) * Fix the checkpoint and changelog topic creation configurations. * Address review comments. * Address review comments. --- .../java/org/apache/samza/system/StreamSpec.java | 4 +- .../samza/system/kafka/KafkaSystemAdmin.java | 7 +++- .../org/apache/samza/config/KafkaConfig.scala | 6 ++- .../system/kafka/TestKafkaSystemAdminJava.java | 26 ++++++++++++ .../org/apache/samza/config/TestKafkaConfig.scala | 46 +++++++++++++++++++++- 5 files changed, 82 insertions(+), 7 deletions(-) diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java index a1ad5e4..c122371 100644 --- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java +++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java @@ -19,6 +19,8 @@ package org.apache.samza.system; +import com.google.common.base.Joiner; + import java.io.Serializable; import java.util.Collections; import java.util.HashMap; @@ -269,6 +271,6 @@ public class StreamSpec implements Serializable { @Override public String toString() { - return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, partCount=%d.", id, systemName, physicalName, partitionCount); + return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, partCount=%d, config=%s.", id, systemName, physicalName, partitionCount, Joiner.on(",").withKeyValueSeparator("=").join(config)); } } diff --git a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java index e5d6af1..ecb95a9 100644 --- a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java +++ b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java @@ -541,8 +541,11 @@ public class KafkaSystemAdmin implements SystemAdmin { new KafkaStreamSpec(spec.getId(), spec.getPhysicalName(), systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties); } else if (spec.isCheckpointStream()) { - kafkaSpec = KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(), systemName)) - .copyWithReplicationFactor(Integer.parseInt(new KafkaConfig(config).getCheckpointReplicationFactor().get())); + Properties checkpointTopicProperties = new Properties(); + checkpointTopicProperties.putAll(spec.getConfig()); + kafkaSpec = KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(), spec.getSystemName())) + .copyWithReplicationFactor(Integer.parseInt(new KafkaConfig(config).getCheckpointReplicationFactor().get())) + .copyWithProperties(checkpointTopicProperties); } else if (intermediateStreamProperties.containsKey(spec.getId())) { kafkaSpec = KafkaStreamSpec.fromSpec(spec); Properties properties = kafkaSpec.getProperties(); diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 75fbb6b..3b5f5f3 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -316,7 +316,6 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true) val kafkaChangeLogProperties = new Properties - val appConfig = new ApplicationConfig(config) // SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.57, // 1.0.2, or 1.1.0 (see KAFKA-6568) @@ -325,7 +324,10 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { // - Set topic TTL to be the same as RocksDB TTL Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match { case Some(rocksDbTtl) => - if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) { + if (!rocksDbTtl.isEmpty && rocksDbTtl.toInt < 0) { + kafkaChangeLogProperties.setProperty("cleanup.policy", "compact") + kafkaChangeLogProperties.setProperty("max.message.bytes", getChangelogStreamMaxMessageByte(name)) + } else if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) { kafkaChangeLogProperties.setProperty("cleanup.policy", "delete") if (!config.containsKey("stores.%s.changelog.kafka.retention.ms" format name)) { kafkaChangeLogProperties.setProperty("retention.ms", String.valueOf(rocksDbTtl)) diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java index 7ca03f3..82d635f 100644 --- a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java +++ b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java @@ -115,6 +115,32 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin { } @Test + public void testToKafkaSpecForCheckpointStreamShouldReturnTheCorrectStreamSpecByPreservingTheConfig() { + String topicName = "testStream"; + String streamId = "samza-internal-checkpoint-stream-id"; + int partitionCount = 1; + Map<String, String> map = new HashMap<>(); + map.put("cleanup.policy", "compact"); + map.put("replication.factor", "3"); + map.put("segment.bytes", "536870912"); + map.put("delete.retention.ms", "86400000"); + + Config config = new MapConfig(map); + + StreamSpec spec = new StreamSpec(streamId, topicName, SYSTEM, partitionCount, config); + KafkaSystemAdmin kafkaSystemAdmin = systemAdmin(); + KafkaStreamSpec kafkaStreamSpec = kafkaSystemAdmin.toKafkaSpec(spec); + System.out.println(kafkaStreamSpec); + assertEquals(streamId, kafkaStreamSpec.getId()); + assertEquals(topicName, kafkaStreamSpec.getPhysicalName()); + assertEquals(partitionCount, kafkaStreamSpec.getPartitionCount()); + assertEquals(3, kafkaStreamSpec.getReplicationFactor()); + assertEquals("compact", kafkaStreamSpec.getConfig().get("cleanup.policy")); + assertEquals("536870912", kafkaStreamSpec.getConfig().get("segment.bytes")); + assertEquals("86400000", kafkaStreamSpec.getConfig().get("delete.retention.ms")); + } + + @Test public void testToKafkaSpec() { String topicName = "testStream"; diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala index 8558a85..00b103d 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -22,14 +22,14 @@ package org.apache.samza.config import java.util.Properties import java.util.concurrent.TimeUnit -import org.apache.samza.config.factories.PropertiesConfigFactory import org.junit.Assert._ +import org.junit.After +import org.junit.Before import org.junit.Test import scala.collection.JavaConverters._ import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.clients.producer.ProducerConfig -import org.junit.Before class TestKafkaConfig { @@ -47,6 +47,10 @@ class TestKafkaConfig { props.setProperty(JobConfig.JOB_NAME, "jobName") } + @After + def clearUpProperties(): Unit = { + props.clear() + } @Test def testStreamLevelFetchSizeOverride() { @@ -82,6 +86,44 @@ class TestKafkaConfig { } @Test + def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForInfiniteTTLStores(): Unit = { + val props = new Properties + props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092") + props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/") + props.setProperty(JobConfig.JOB_NAME, "jobName") + + props.setProperty("stores.test1.changelog", "kafka.mychangelog1") + props.setProperty("stores.test1.rocksdb.ttl.ms", "-1") + + val mapConfig = new MapConfig(props.asScala.asJava) + val kafkaConfig = new KafkaConfig(mapConfig) + val kafkaProperties = kafkaConfig.getChangelogKafkaProperties("test1") + assertEquals("compact", kafkaProperties.getProperty("cleanup.policy")) + assertEquals("536870912", kafkaProperties.getProperty("segment.bytes")) + assertEquals("1000012", kafkaProperties.getProperty("max.message.bytes")) + assertEquals("86400000", kafkaProperties.getProperty("delete.retention.ms")) + } + + @Test + def testChangeLogPropertiesShouldReturnCorrectTopicConfigurationForStoresWithEmptyRocksDBTTL(): Unit = { + val props = new Properties + props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + "bootstrap.servers", "localhost:9092") + props.setProperty("systems." + SYSTEM_NAME + ".consumer.zookeeper.connect", "localhost:2181/") + props.setProperty(JobConfig.JOB_NAME, "jobName") + + props.setProperty("stores.test1.changelog", "kafka.mychangelog1") + + val mapConfig = new MapConfig(props.asScala.asJava) + val kafkaConfig = new KafkaConfig(mapConfig) + val kafkaProperties = kafkaConfig.getChangelogKafkaProperties("test1") + assertEquals("compact", kafkaProperties.getProperty("cleanup.policy")) + assertEquals("536870912", kafkaProperties.getProperty("segment.bytes")) + assertEquals("86400000", kafkaProperties.getProperty("delete.retention.ms")) + assertEquals("1000012", kafkaProperties.getProperty("max.message.bytes")) + + } + + @Test def testChangeLogProperties() { props.setProperty("job.changelog.system", SYSTEM_NAME) props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")
